using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; namespace MatrixIO.Collections { /// /// Concurrent Priority Queue with fast O(log n) insertion based on a probabilistic Skip list. /// public class ConcurrentPriorityQueue : IEnumerable { #region Private Members private readonly object _syncRoot = new object(); private readonly IComparer _comparer; private readonly int _maxLevel; private readonly double _bias; private int _level; private readonly Node _head; private readonly Node _foot; private readonly Random _random = new Random(); private volatile int _count; // NOTE: Better implemented with a semaphore but that's not available on all platforms private readonly AutoResetEvent _itemReady = new AutoResetEvent(false); #endregion public int Count { get { return _count; } } public bool IsEmpty { get { return _count <= 0; } } public bool IsSynchronized { get { return true; } } public object SyncRoot { get { return _syncRoot; } } #region Constructors public ConcurrentPriorityQueue() : this(Comparer.Default) { } public ConcurrentPriorityQueue(int maxLevel) : this(Comparer.Default, maxLevel) { } public ConcurrentPriorityQueue(int maxLevel, double bias) : this(Comparer.Default, maxLevel, bias) { } public ConcurrentPriorityQueue(IComparer keyComparer) : this(keyComparer, 32) { } public ConcurrentPriorityQueue(IComparer keyComparer, int maxLevel) : this(keyComparer, maxLevel, 0.5D) { } public ConcurrentPriorityQueue(IComparer keyComparer, int maxLevel, double bias) { _comparer = keyComparer; _maxLevel = maxLevel; _bias = bias; _head = new Node(_maxLevel); _foot = new Node(0); for (var i = 0; i < _head.Forward.Length; i++) _head.Forward[i] = _foot; } #endregion public void Enqueue(TPriority priority, TItem value) { lock (_syncRoot) { var update = new Node[_level+1]; var x = _head; // Find the insertion point for (var i = _level; i >= 0; i--) { while (x.Forward[i] != _foot && _comparer.Compare(x.Forward[i].Key, priority) < 1) x = x.Forward[i]; // Store the path we used to get here update[i] = x; } x = x.Forward[0]; // Decide the level for the new node var level = 1; while (_random.NextDouble() < _bias && level < _maxLevel && level <= _level) level++; if (level > _level) { for (var i = _level + 1; i < level; i++) update[i] = _head; _level = level; } // Create the node x = new Node(level, priority, value); // Link the node for (var i = 0; i < level; i++) { x.Forward[i] = update[i].Forward[i]; update[i].Forward[i] = x; } _count++; } _itemReady.Set(); } public bool TryDequeue(out TItem item) { lock (_syncRoot) { if (!_head.Forward[0].Equals(_foot)) { var x = _head.Forward[0]; item = x.Item; for (var i = 0; i < x.Forward.Length; i++) _head.Forward[i] = x.Forward[i]; _count--; if (_count <= 0) _itemReady.Reset(); else _itemReady.Set(); return true; } } item = default(TItem); return false; } // Due to limitations of some platforms supported by Portable Libraries, // the initial lock is not included in the timeout. public bool TryDequeue(out TItem item, TimeSpan timeout) { lock (_syncRoot) { if (_itemReady.WaitOne(timeout)) if (TryDequeue(out item)) return true; else Debug.WriteLine("Failed to get item after wait of less than requested timeout."); } item = default(TItem); return false; } public bool TryPeek(out TItem item) { lock (_syncRoot) { if(_head.Forward[0] != _foot) { item = _head.Forward[0].Item; return true; } item = default(TItem); return false; } } public void CopyTo(TItem[] array, int arrayIndex) { var offset = arrayIndex; foreach(var value in this) array[offset++] = value; } public TItem[] ToArray() { lock(_syncRoot) { var items = new TItem[_count]; CopyTo(items, 0); return items; } } #region IEnumerable Implementation public IEnumerator GetEnumerator() { lock (_syncRoot) { var x = _head; while (!x.Forward[0].Equals(_foot)) { yield return x.Forward[0].Item; x = x.Forward[0]; } } } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return GetEnumerator(); } #endregion #region Node Class private class Node { public readonly TPriority Key; public readonly TItem Item; public readonly Node[] Forward; public Node(int level) { Forward = new Node[level]; } public Node(int level, TPriority key, TItem item) : this(level) { Key = key; Item = item; } } #endregion } }