Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AtomicCounter opaque methods #319

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 169 additions & 6 deletions agrona/src/main/java/org/agrona/concurrent/status/AtomicCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@

/**
* Atomic counter that is backed by an {@link AtomicBuffer} that can be read across threads and processes.
* <p>
* In most cases you want to pair the appropriate methods for ordering. E.g.
* <ol>
* <li>an {@link #increment} (which has volatile semantics) should be combined with a {@link #get}.</li>
* <li>an {@link #incrementRelease} with a {@link #getAcquire}.</li>
* <li>an {@link #incrementOpaque} with a {@link #getOpaque}.</li>
* <li>an {@link #incrementPlain} with a {@link #getPlain}.</li>
* </ol>
*/
public class AtomicCounter implements AutoCloseable
{
Expand Down Expand Up @@ -222,18 +230,22 @@ public long incrementOrdered()
}

/**
* Perform a non-atomic increment.
* Perform a non-atomic increment with release semantics.
* <p>
* It can result into lost updates due to race condition when called concurrently.
* <p>
* The load has plain memory semantics and the store has release memory semantics.
* <p>
* The typical use-case is when there is a single writer thread and one or more reader threads.
* The typical use-case is when there is a single writer thread and one or more reader threads and causality
* needs to be preserved using the {@link #getAcquire()}.
* <p>
* This method will outperform the {@link #increment()}. So if there is just a single mutator thread, and
* This method is likely to outperform the {@link #increment()}. So if there is just a single mutator thread, and
* one or more reader threads, then it is likely you will prefer this method.
* <p>
* If no memory ordering is needed, have a look at the {@link #incrementOpaque()}.
*
* @return the previous value of the counter
* @since 2.1.0
*/
public long incrementRelease()
{
Expand All @@ -244,6 +256,32 @@ public long incrementRelease()
return currentValue;
}

/**
* Perform a non-atomic increment using opaque semantics.
* <p>
* It can result into lost updates due to race condition when called concurrently.
* <p>
* The load has plain memory semantics and the store has opaque memory semantics.
* <p>
* The typical use-case is when there is a single writer thread and one or more reader threads and surrounding
* loads/stores don't need to be ordered.
* <p>
* This method should be at least fast as {@link #incrementRelease()} since it has weaker memory semantics.
* So if there is just a single mutator thread, and one or more reader threads, then it is likely you will
* prefer this method.
*
* @return the previous value of the counter
* @since 2.1.0
*/
public long incrementOpaque()
{
final byte[] array = byteArray;
final long offset = addressOffset;
final long currentValue = UnsafeApi.getLong(array, offset);
UnsafeApi.putLongOpaque(array, offset, currentValue + 1);
return currentValue;
}

/**
* Increment the counter.
* <p>
Expand All @@ -266,6 +304,8 @@ public long incrementPlain()

/**
* Perform an atomic decrement that will not lose updates across threads.
* <p>
* The loads and store have volatile memory semantics.
*
* @return the previous value of the counter
*/
Expand All @@ -278,14 +318,15 @@ public long decrement()
* Perform an atomic decrement that is not safe across threads.
*
* @return the previous value of the counter
* @since 2.1.0
*/
public long decrementOrdered()
{
return decrementRelease();
}

/**
* Decrements the counter non-atomically.
* Decrements the counter non-atomically with release semantics.
* <p>
* It can result into lost updates to race condition when called concurrently.
* <p>
Expand All @@ -308,6 +349,33 @@ public long decrementRelease()
return currentValue;
}

/**
* Perform a non-atomic decrement using opaque semantics.
* <p>
* It can result into lost updates due to race condition when called concurrently.
* <p>
* The load has plain memory semantics and the store has opaque memory semantics.
* <p>
* The typical use-case is when there is a single writer thread and one or more reader threads and surrounding
* loads and stores don't need to be ordered.
* <p>
* This method should be at least fast as {@link #incrementRelease()} since it has weaker memory semantics.
* So if there is just a single mutator thread, and one or more reader threads, then it is likely you will
* prefer this method.
*
* @return the previous value of the counter
* @since 2.1.0
*/
public long decrementOpaque()
{
final byte[] array = byteArray;
final long offset = addressOffset;
final long currentValue = UnsafeApi.getLong(array, offset);
UnsafeApi.putLongOpaque(array, offset, currentValue - 1);

return currentValue;
}

/**
* Decrements the counter.
* <p>
Expand Down Expand Up @@ -364,6 +432,19 @@ public void setRelease(final long value)
UnsafeApi.putLongRelease(byteArray, addressOffset, value);
}

/**
* Set the counter value atomically.
* <p>
* The store has opaque memory semantics.
*
* @param value to be set
* @since 2.1.0
*/
public void setOpaque(final long value)
{
UnsafeApi.putLongOpaque(byteArray, addressOffset, value);
}

/**
* Set the counter with normal semantics.
* <p>
Expand Down Expand Up @@ -412,14 +493,14 @@ public long getAndAddOrdered(final long increment)
}

/**
* Adds an increment to the counter non atomically.
* Adds an increment to the counter non-atomically.
* <p>
* This method is not atomic; it can suffer from lost-updates due to race conditions.
* <p>
* The load has plain memory semantics and the store has release memory semantics.
* <p>
* The typical use-case is when there is one mutator thread, that calls this method, and one or more reader
* threads.
* threads. Typically, this method is combined with the {@link #getAcquire()} to read the value.
*
* @param increment to be added
* @return the previous value of the counter
Expand All @@ -435,6 +516,33 @@ public long getAndAddRelease(final long increment)
return currentValue;
}

/**
* Adds an increment to the counter non-atomically.
* <p>
* This method is not atomic; it can suffer from lost-updates due to race conditions.
* <p>
* The load has plain memory semantics and the store has opaque memory semantics.
* <p>
* The typical use-case is when there is one mutator thread, that calls this method, and one or more reader
* threads. Typically, this method is combined with a {@link #getOpaque()} to read the value.
* <p>
* If ordering of surrounding loads/stores isn't important, then this method is likely to be faster than
* {@link #getAndAddRelease(long)} because it has less strict memory ordering requirements.
*
* @param increment to be added
* @return the previous value of the counter
* @since 2.1.0
*/
public long getAndAddOpaque(final long increment)
{
final byte[] array = byteArray;
final long offset = addressOffset;
final long currentValue = UnsafeApi.getLong(array, offset);
UnsafeApi.putLongOpaque(array, offset, currentValue + increment);

return currentValue;
}

/**
* Get the current value of a counter and atomically set it to a new value.
*
Expand Down Expand Up @@ -468,8 +576,32 @@ public long get()
return UnsafeApi.getLongVolatile(byteArray, addressOffset);
}

/**
* Get the value for the counter with acquire semantics.
*
* @return the value for the counter.
* @since 2.1.0
*/
public long getAcquire()
{
return UnsafeApi.getLongAcquire(byteArray, addressOffset);
}

/**
* Get the value for the counter with opaque semantics.
*
* @return the value for the counter.
* @since 2.1.0
*/
public long getOpaque()
{
return UnsafeApi.getLongOpaque(byteArray, addressOffset);
}

/**
* Get the value of the counter using weak ordering semantics. This is the same a standard read of a field.
* <p>
* This call is identical to {@link #getPlain()} and that method is preferred.
*
* @return the value for the counter.
*/
Expand Down Expand Up @@ -551,6 +683,37 @@ public boolean proposeMaxRelease(final long proposedValue)
return updated;
}

/**
* Set the value to a new proposedValue if greater than the current value.
* <p>
* This call is not atomic and can suffer from lost updates to race conditions.
* <p>
* The load has plain memory semantics and the store has opaque memory semantics.
* <p>
* The typical use-case is when there is one mutator thread, that calls this method, and one or more reader threads.
* <p>
* This method is likely to outperform {@link #proposeMaxRelease(long)} since this method has less memory ordering
* requirements.
*
* @param proposedValue for the new max.
* @return true if a new max as been set otherwise false.
* @since 2.1.0
*/
public boolean proposeMaxOpaque(final long proposedValue)
{
boolean updated = false;

final byte[] array = byteArray;
final long offset = addressOffset;
if (UnsafeApi.getLong(array, offset) < proposedValue)
{
UnsafeApi.putLongOpaque(array, offset, proposedValue);
updated = true;
}

return updated;
}

/**
* {@inheritDoc}
*/
Expand Down