From 8115f4d4060e16db640ffaba572dbb597ce2ff8b Mon Sep 17 00:00:00 2001 From: Brezae Vlad Date: Fri, 12 Feb 2021 11:36:06 +0200 Subject: [PATCH] Break queue segment chain if no enumeration is in progress Mono doesn't precisely scan stack which means that, if a QueueSegment ref gets stuck in a stack slot over a long period of time, no future queue segments will be collected and the queue will leak. We break the segment chain when dequeuing the last element from a segment, if no enumerations are in progress. This and the starting of the enumeration are protected by the same lock, which means we cannot miss the start of an enumeration while we are preparing to null the link. --- .../Collections/Concurrent/ConcurrentQueue.cs | 97 +++++++++++-------- 1 file changed, 58 insertions(+), 39 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs index e45ed1e6a21b49..00ae1928954607 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs @@ -56,6 +56,11 @@ public class ConcurrentQueue : IProducerConsumerCollection, IReadOnlyColle private volatile ConcurrentQueueSegment _tail; /// The current head segment. private volatile ConcurrentQueueSegment _head; // SOS's ThreadPool command depends on this name + /// + /// The number of collection enumerations that are currently in progress. If this is 0, we can safely break the chaining + /// of queue segments, when dequeuing the last element. + /// + private int pendingEnumerations; /// /// Initializes a new instance of the class. @@ -229,7 +234,7 @@ public T[] ToArray() T[] arr = new T[count]; // Now enumerate the contents, copying each element into the array. - using (IEnumerator e = Enumerate(head, headHead, tail, tailTail)) + using (IEnumerator e = Enumerate(head, headHead, tail, tailTail, this)) { int i = 0; while (e.MoveNext()) @@ -458,7 +463,7 @@ public void CopyTo(T[] array, int index) // Copy the items to the target array int i = index; - using (IEnumerator e = Enumerate(head, headHead, tail, tailTail)) + using (IEnumerator e = Enumerate(head, headHead, tail, tailTail, this)) { while (e.MoveNext()) { @@ -480,7 +485,7 @@ public void CopyTo(T[] array, int index) public IEnumerator GetEnumerator() { SnapForObservation(out ConcurrentQueueSegment head, out int headHead, out ConcurrentQueueSegment tail, out int tailTail); - return Enumerate(head, headHead, tail, tailTail); + return Enumerate(head, headHead, tail, tailTail, this); } /// @@ -514,6 +519,8 @@ private void SnapForObservation(out ConcurrentQueueSegment head, out int head headHead = Volatile.Read(ref head._headAndTail.Head); tailTail = Volatile.Read(ref tail._headAndTail.Tail); + + Interlocked.Increment (ref pendingEnumerations); } } @@ -540,59 +547,66 @@ private static T GetItemWhenAvailable(ConcurrentQueueSegment segment, int i) return segment._slots[i].Item!; } - private static IEnumerator Enumerate(ConcurrentQueueSegment head, int headHead, ConcurrentQueueSegment tail, int tailTail) + private static IEnumerator Enumerate(ConcurrentQueueSegment head, int headHead, ConcurrentQueueSegment tail, int tailTail, ConcurrentQueue queue) { Debug.Assert(head._preservedForObservation); Debug.Assert(head._frozenForEnqueues); Debug.Assert(tail._preservedForObservation); Debug.Assert(tail._frozenForEnqueues); - // Head segment. We've already marked it as not accepting any more enqueues, - // so its tail position is fixed, and we've already marked it as preserved for - // enumeration (before we grabbed its head), so we can safely enumerate from - // its head to its tail. - int headTail = (head == tail ? tailTail : Volatile.Read(ref head._headAndTail.Tail)) - head.FreezeOffset; - if (headHead < headTail) + try { - headHead &= head._slotsMask; - headTail &= head._slotsMask; - + // Head segment. We've already marked it as not accepting any more enqueues, + // so its tail position is fixed, and we've already marked it as preserved for + // enumeration (before we grabbed its head), so we can safely enumerate from + // its head to its tail. + int headTail = (head == tail ? tailTail : Volatile.Read(ref head._headAndTail.Tail)) - head.FreezeOffset; if (headHead < headTail) { - for (int i = headHead; i < headTail; i++) yield return GetItemWhenAvailable(head, i); - } - else - { - for (int i = headHead; i < head._slots.Length; i++) yield return GetItemWhenAvailable(head, i); - for (int i = 0; i < headTail; i++) yield return GetItemWhenAvailable(head, i); - } - } - - // We've enumerated the head. If the tail is the same, we're done. - if (head != tail) - { - // Each segment between head and tail, not including head and tail. Since there were - // segments before these, for our purposes we consider it to start at the 0th element. - for (ConcurrentQueueSegment s = head._nextSegment!; s != tail; s = s._nextSegment!) - { - Debug.Assert(s._preservedForObservation, "Would have had to been preserved as a segment part of enumeration"); - Debug.Assert(s._frozenForEnqueues, "Would have had to be frozen for enqueues as it's intermediate"); + headHead &= head._slotsMask; + headTail &= head._slotsMask; - int sTail = s._headAndTail.Tail - s.FreezeOffset; - for (int i = 0; i < sTail; i++) + if (headHead < headTail) + { + for (int i = headHead; i < headTail; i++) yield return GetItemWhenAvailable(head, i); + } + else { - yield return GetItemWhenAvailable(s, i); + for (int i = headHead; i < head._slots.Length; i++) yield return GetItemWhenAvailable(head, i); + for (int i = 0; i < headTail; i++) yield return GetItemWhenAvailable(head, i); } } - // Enumerate the tail. Since there were segments before this, we can just start at - // its beginning, and iterate until the tail we already grabbed. - tailTail -= tail.FreezeOffset; - for (int i = 0; i < tailTail; i++) + // We've enumerated the head. If the tail is the same, we're done. + if (head != tail) { - yield return GetItemWhenAvailable(tail, i); + // Each segment between head and tail, not including head and tail. Since there were + // segments before these, for our purposes we consider it to start at the 0th element. + for (ConcurrentQueueSegment s = head._nextSegment!; s != tail; s = s._nextSegment!) + { + Debug.Assert(s._preservedForObservation, "Would have had to been preserved as a segment part of enumeration"); + Debug.Assert(s._frozenForEnqueues, "Would have had to be frozen for enqueues as it's intermediate"); + + int sTail = s._headAndTail.Tail - s.FreezeOffset; + for (int i = 0; i < sTail; i++) + { + yield return GetItemWhenAvailable(s, i); + } + } + + // Enumerate the tail. Since there were segments before this, we can just start at + // its beginning, and iterate until the tail we already grabbed. + tailTail -= tail.FreezeOffset; + for (int i = 0; i < tailTail; i++) + { + yield return GetItemWhenAvailable(tail, i); + } } } + finally + { + Interlocked.Decrement (ref queue.pendingEnumerations); + } } /// Adds an object to the end of the . @@ -730,6 +744,11 @@ private bool TryDequeueSlow([MaybeNullWhen(false)] out T item) if (head == _head) { _head = head._nextSegment; + // Attempt to break the chain of segments in order to prevent unbounded memory growth, + // in case one segment remains pinned in memory (which can happen with mono). We can't race + // with the start of an enumeration because it needs to first take the cross segement lock. + if (pendingEnumerations == 0) + head._nextSegment = null; } } }