Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break queue segment chain if no enumeration is in progress #48296

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyColle
private volatile ConcurrentQueueSegment<T> _tail;
/// <summary>The current head segment.</summary>
private volatile ConcurrentQueueSegment<T> _head; // SOS's ThreadPool command depends on this name
/// <summary>
/// The number of collection enumerations that are currently in progress. If this is 0, we can safely break the chaining
/// of queue segments, when dequeuing the last element.
/// </summary>
private int pendingEnumerations;

/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class.
Expand Down Expand Up @@ -229,7 +234,7 @@ public T[] ToArray()
T[] arr = new T[count];

// Now enumerate the contents, copying each element into the array.
using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail))
using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail, this))
{
int i = 0;
while (e.MoveNext())
Expand Down Expand Up @@ -458,7 +463,7 @@ public void CopyTo(T[] array, int index)

// Copy the items to the target array
int i = index;
using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail))
using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail, this))
{
while (e.MoveNext())
{
Expand All @@ -480,7 +485,7 @@ public void CopyTo(T[] array, int index)
public IEnumerator<T> GetEnumerator()
{
SnapForObservation(out ConcurrentQueueSegment<T> head, out int headHead, out ConcurrentQueueSegment<T> tail, out int tailTail);
return Enumerate(head, headHead, tail, tailTail);
return Enumerate(head, headHead, tail, tailTail, this);
}

/// <summary>
Expand Down Expand Up @@ -514,6 +519,8 @@ private void SnapForObservation(out ConcurrentQueueSegment<T> head, out int head

headHead = Volatile.Read(ref head._headAndTail.Head);
tailTail = Volatile.Read(ref tail._headAndTail.Tail);

Interlocked.Increment (ref pendingEnumerations);
}
}

Expand All @@ -540,59 +547,66 @@ private static T GetItemWhenAvailable(ConcurrentQueueSegment<T> segment, int i)
return segment._slots[i].Item!;
}

private static IEnumerator<T> Enumerate(ConcurrentQueueSegment<T> head, int headHead, ConcurrentQueueSegment<T> tail, int tailTail)
private static IEnumerator<T> Enumerate(ConcurrentQueueSegment<T> head, int headHead, ConcurrentQueueSegment<T> tail, int tailTail, ConcurrentQueue<T> queue)
{
Debug.Assert(head._preservedForObservation);
Debug.Assert(head._frozenForEnqueues);
Debug.Assert(tail._preservedForObservation);
Debug.Assert(tail._frozenForEnqueues);

// Head segment. We've already marked it as not accepting any more enqueues,
// so its tail position is fixed, and we've already marked it as preserved for
// enumeration (before we grabbed its head), so we can safely enumerate from
// its head to its tail.
int headTail = (head == tail ? tailTail : Volatile.Read(ref head._headAndTail.Tail)) - head.FreezeOffset;
if (headHead < headTail)
try
{
headHead &= head._slotsMask;
headTail &= head._slotsMask;

// Head segment. We've already marked it as not accepting any more enqueues,
// so its tail position is fixed, and we've already marked it as preserved for
// enumeration (before we grabbed its head), so we can safely enumerate from
// its head to its tail.
int headTail = (head == tail ? tailTail : Volatile.Read(ref head._headAndTail.Tail)) - head.FreezeOffset;
if (headHead < headTail)
{
for (int i = headHead; i < headTail; i++) yield return GetItemWhenAvailable(head, i);
}
else
{
for (int i = headHead; i < head._slots.Length; i++) yield return GetItemWhenAvailable(head, i);
for (int i = 0; i < headTail; i++) yield return GetItemWhenAvailable(head, i);
}
}

// We've enumerated the head. If the tail is the same, we're done.
if (head != tail)
{
// Each segment between head and tail, not including head and tail. Since there were
// segments before these, for our purposes we consider it to start at the 0th element.
for (ConcurrentQueueSegment<T> s = head._nextSegment!; s != tail; s = s._nextSegment!)
{
Debug.Assert(s._preservedForObservation, "Would have had to been preserved as a segment part of enumeration");
Debug.Assert(s._frozenForEnqueues, "Would have had to be frozen for enqueues as it's intermediate");
headHead &= head._slotsMask;
headTail &= head._slotsMask;

int sTail = s._headAndTail.Tail - s.FreezeOffset;
for (int i = 0; i < sTail; i++)
if (headHead < headTail)
{
for (int i = headHead; i < headTail; i++) yield return GetItemWhenAvailable(head, i);
}
else
{
yield return GetItemWhenAvailable(s, i);
for (int i = headHead; i < head._slots.Length; i++) yield return GetItemWhenAvailable(head, i);
for (int i = 0; i < headTail; i++) yield return GetItemWhenAvailable(head, i);
}
}

// Enumerate the tail. Since there were segments before this, we can just start at
// its beginning, and iterate until the tail we already grabbed.
tailTail -= tail.FreezeOffset;
for (int i = 0; i < tailTail; i++)
// We've enumerated the head. If the tail is the same, we're done.
if (head != tail)
{
yield return GetItemWhenAvailable(tail, i);
// Each segment between head and tail, not including head and tail. Since there were
// segments before these, for our purposes we consider it to start at the 0th element.
for (ConcurrentQueueSegment<T> s = head._nextSegment!; s != tail; s = s._nextSegment!)
{
Debug.Assert(s._preservedForObservation, "Would have had to been preserved as a segment part of enumeration");
Debug.Assert(s._frozenForEnqueues, "Would have had to be frozen for enqueues as it's intermediate");

int sTail = s._headAndTail.Tail - s.FreezeOffset;
for (int i = 0; i < sTail; i++)
{
yield return GetItemWhenAvailable(s, i);
}
}

// Enumerate the tail. Since there were segments before this, we can just start at
// its beginning, and iterate until the tail we already grabbed.
tailTail -= tail.FreezeOffset;
for (int i = 0; i < tailTail; i++)
{
yield return GetItemWhenAvailable(tail, i);
}
}
}
finally
{
Interlocked.Decrement (ref queue.pendingEnumerations);
}
}

/// <summary>Adds an object to the end of the <see cref="ConcurrentQueue{T}"/>.</summary>
Expand Down Expand Up @@ -730,6 +744,11 @@ private bool TryDequeueSlow([MaybeNullWhen(false)] out T item)
if (head == _head)
{
_head = head._nextSegment;
// Attempt to break the chain of segments in order to prevent unbounded memory growth,
// in case one segment remains pinned in memory (which can happen with mono). We can't race
// with the start of an enumeration because it needs to first take the cross segement lock.
if (pendingEnumerations == 0)
head._nextSegment = null;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh. This looks racy with other dequeuers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will break dequeues. A dequeuer could have already grabbed the head and needs to be able to fully traverse all linked segments from there.

}
}
}
Expand Down