ConcurrentPriorityQueue.cs 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Threading;
  5. namespace MatrixIO.Collections
  6. {
  7. /// <summary>
  8. /// Concurrent Priority Queue with fast O(log n) insertion based on a probabilistic Skip list.
  9. /// </summary>
  10. public class ConcurrentPriorityQueue<TPriority, TItem> : IEnumerable<TItem>
  11. {
  12. #region Private Members
  13. private readonly object _syncRoot = new object();
  14. private readonly IComparer<TPriority> _comparer;
  15. private readonly int _maxLevel;
  16. private readonly double _bias;
  17. private int _level;
  18. private readonly Node _head;
  19. private readonly Node _foot;
  20. private readonly Random _random = new Random();
  21. private volatile int _count;
  22. // NOTE: Better implemented with a semaphore but that's not available on all platforms
  23. private readonly AutoResetEvent _itemReady = new AutoResetEvent(false);
  24. #endregion
  25. public int Count { get { return _count; } }
  26. public bool IsEmpty { get { return _count <= 0; } }
  27. public bool IsSynchronized { get { return true; } }
  28. public object SyncRoot { get { return _syncRoot; } }
  29. #region Constructors
  30. public ConcurrentPriorityQueue() : this(Comparer<TPriority>.Default) { }
  31. public ConcurrentPriorityQueue(int maxLevel) : this(Comparer<TPriority>.Default, maxLevel) { }
  32. public ConcurrentPriorityQueue(int maxLevel, double bias) : this(Comparer<TPriority>.Default, maxLevel, bias) { }
  33. public ConcurrentPriorityQueue(IComparer<TPriority> keyComparer) : this(keyComparer, 32) { }
  34. public ConcurrentPriorityQueue(IComparer<TPriority> keyComparer, int maxLevel) : this(keyComparer, maxLevel, 0.5D) { }
  35. public ConcurrentPriorityQueue(IComparer<TPriority> keyComparer, int maxLevel, double bias)
  36. {
  37. _comparer = keyComparer;
  38. _maxLevel = maxLevel;
  39. _bias = bias;
  40. _head = new Node(_maxLevel);
  41. _foot = new Node(0);
  42. for (var i = 0; i < _head.Forward.Length; i++) _head.Forward[i] = _foot;
  43. }
  44. #endregion
  45. public void Enqueue(TPriority priority, TItem value)
  46. {
  47. lock (_syncRoot)
  48. {
  49. var update = new Node[_level+1];
  50. var x = _head;
  51. // Find the insertion point
  52. for (var i = _level; i >= 0; i--)
  53. {
  54. while (x.Forward[i] != _foot && _comparer.Compare(x.Forward[i].Key, priority) < 1)
  55. x = x.Forward[i];
  56. // Store the path we used to get here
  57. update[i] = x;
  58. }
  59. x = x.Forward[0];
  60. // Decide the level for the new node
  61. var level = 1;
  62. while (_random.NextDouble() < _bias && level < _maxLevel && level <= _level) level++;
  63. if (level > _level)
  64. {
  65. for (var i = _level + 1; i < level; i++)
  66. update[i] = _head;
  67. _level = level;
  68. }
  69. // Create the node
  70. x = new Node(level, priority, value);
  71. // Link the node
  72. for (var i = 0; i < level; i++)
  73. {
  74. x.Forward[i] = update[i].Forward[i];
  75. update[i].Forward[i] = x;
  76. }
  77. _count++;
  78. }
  79. _itemReady.Set();
  80. }
  81. public bool TryDequeue(out TItem item)
  82. {
  83. lock (_syncRoot)
  84. {
  85. if (!_head.Forward[0].Equals(_foot))
  86. {
  87. var x = _head.Forward[0];
  88. item = x.Item;
  89. for (var i = 0; i < x.Forward.Length; i++)
  90. _head.Forward[i] = x.Forward[i];
  91. _count--;
  92. if (_count <= 0) _itemReady.Reset();
  93. else _itemReady.Set();
  94. return true;
  95. }
  96. }
  97. item = default(TItem);
  98. return false;
  99. }
  100. // Due to limitations of some platforms supported by Portable Libraries,
  101. // the initial lock is not included in the timeout.
  102. public bool TryDequeue(out TItem item, TimeSpan timeout)
  103. {
  104. lock (_syncRoot)
  105. {
  106. if (_itemReady.WaitOne(timeout))
  107. if (TryDequeue(out item)) return true;
  108. else Debug.WriteLine("Failed to get item after wait of less than requested timeout.");
  109. }
  110. item = default(TItem);
  111. return false;
  112. }
  113. public bool TryPeek(out TItem item)
  114. {
  115. lock (_syncRoot)
  116. {
  117. if(_head.Forward[0] != _foot)
  118. {
  119. item = _head.Forward[0].Item;
  120. return true;
  121. }
  122. item = default(TItem);
  123. return false;
  124. }
  125. }
  126. public void CopyTo(TItem[] array, int arrayIndex)
  127. {
  128. var offset = arrayIndex;
  129. foreach(var value in this)
  130. array[offset++] = value;
  131. }
  132. public TItem[] ToArray()
  133. {
  134. lock(_syncRoot)
  135. {
  136. var items = new TItem[_count];
  137. CopyTo(items, 0);
  138. return items;
  139. }
  140. }
  141. #region IEnumerable Implementation
  142. public IEnumerator<TItem> GetEnumerator()
  143. {
  144. lock (_syncRoot)
  145. {
  146. var x = _head;
  147. while (!x.Forward[0].Equals(_foot))
  148. {
  149. yield return x.Forward[0].Item;
  150. x = x.Forward[0];
  151. }
  152. }
  153. }
  154. System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  155. {
  156. return GetEnumerator();
  157. }
  158. #endregion
  159. #region Node Class
  160. private class Node
  161. {
  162. public readonly TPriority Key;
  163. public readonly TItem Item;
  164. public readonly Node[] Forward;
  165. public Node(int level)
  166. {
  167. Forward = new Node[level];
  168. }
  169. public Node(int level, TPriority key, TItem item) : this(level)
  170. {
  171. Key = key;
  172. Item = item;
  173. }
  174. }
  175. #endregion
  176. }
  177. }