From c4905a8fd026343aaa8909ebb8cd0b885a77e7e2 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Sun, 2 Feb 2025 09:20:26 +0200 Subject: [PATCH] AtomicCounter opaque methods --- .../concurrent/status/AtomicCounter.java | 175 +++++++++++++++++- 1 file changed, 169 insertions(+), 6 deletions(-) diff --git a/agrona/src/main/java/org/agrona/concurrent/status/AtomicCounter.java b/agrona/src/main/java/org/agrona/concurrent/status/AtomicCounter.java index 9a4d99f61..0c40bd63c 100644 --- a/agrona/src/main/java/org/agrona/concurrent/status/AtomicCounter.java +++ b/agrona/src/main/java/org/agrona/concurrent/status/AtomicCounter.java @@ -27,6 +27,14 @@ /** * Atomic counter that is backed by an {@link AtomicBuffer} that can be read across threads and processes. + *

+ * In most cases you want to pair the appropriate methods for ordering. E.g. + *

    + *
  1. an {@link #increment} (which has volatile semantics) should be combined with a {@link #get}.
  2. + *
  3. an {@link #incrementRelease} with a {@link #getAcquire}.
  4. + *
  5. an {@link #incrementOpaque} with a {@link #getOpaque}.
  6. + *
  7. an {@link #incrementPlain} with a {@link #getPlain}.
  8. + *
*/ public class AtomicCounter implements AutoCloseable { @@ -222,18 +230,22 @@ public long incrementOrdered() } /** - * Perform a non-atomic increment. + * Perform a non-atomic increment with release semantics. *

* It can result into lost updates due to race condition when called concurrently. *

* The load has plain memory semantics and the store has release memory semantics. *

- * The typical use-case is when there is a single writer thread and one or more reader threads. + * The typical use-case is when there is a single writer thread and one or more reader threads and causality + * needs to be preserved using the {@link #getAcquire()}. *

- * This method will outperform the {@link #increment()}. So if there is just a single mutator thread, and + * This method is likely to outperform the {@link #increment()}. So if there is just a single mutator thread, and * one or more reader threads, then it is likely you will prefer this method. + *

+ * If no memory ordering is needed, have a look at the {@link #incrementOpaque()}. * * @return the previous value of the counter + * @since 2.1.0 */ public long incrementRelease() { @@ -244,6 +256,32 @@ public long incrementRelease() return currentValue; } + /** + * Perform a non-atomic increment using opaque semantics. + *

+ * It can result into lost updates due to race condition when called concurrently. + *

+ * The load has plain memory semantics and the store has opaque memory semantics. + *

+ * The typical use-case is when there is a single writer thread and one or more reader threads and surrounding + * loads/stores don't need to be ordered. + *

+ * This method should be at least fast as {@link #incrementRelease()} since it has weaker memory semantics. + * So if there is just a single mutator thread, and one or more reader threads, then it is likely you will + * prefer this method. + * + * @return the previous value of the counter + * @since 2.1.0 + */ + public long incrementOpaque() + { + final byte[] array = byteArray; + final long offset = addressOffset; + final long currentValue = UnsafeApi.getLong(array, offset); + UnsafeApi.putLongOpaque(array, offset, currentValue + 1); + return currentValue; + } + /** * Increment the counter. *

@@ -266,6 +304,8 @@ public long incrementPlain() /** * Perform an atomic decrement that will not lose updates across threads. + *

+ * The loads and store have volatile memory semantics. * * @return the previous value of the counter */ @@ -278,6 +318,7 @@ public long decrement() * Perform an atomic decrement that is not safe across threads. * * @return the previous value of the counter + * @since 2.1.0 */ public long decrementOrdered() { @@ -285,7 +326,7 @@ public long decrementOrdered() } /** - * Decrements the counter non-atomically. + * Decrements the counter non-atomically with release semantics. *

* It can result into lost updates to race condition when called concurrently. *

@@ -308,6 +349,33 @@ public long decrementRelease() return currentValue; } + /** + * Perform a non-atomic decrement using opaque semantics. + *

+ * It can result into lost updates due to race condition when called concurrently. + *

+ * The load has plain memory semantics and the store has opaque memory semantics. + *

+ * The typical use-case is when there is a single writer thread and one or more reader threads and surrounding + * loads and stores don't need to be ordered. + *

+ * This method should be at least fast as {@link #incrementRelease()} since it has weaker memory semantics. + * So if there is just a single mutator thread, and one or more reader threads, then it is likely you will + * prefer this method. + * + * @return the previous value of the counter + * @since 2.1.0 + */ + public long decrementOpaque() + { + final byte[] array = byteArray; + final long offset = addressOffset; + final long currentValue = UnsafeApi.getLong(array, offset); + UnsafeApi.putLongOpaque(array, offset, currentValue - 1); + + return currentValue; + } + /** * Decrements the counter. *

@@ -364,6 +432,19 @@ public void setRelease(final long value) UnsafeApi.putLongRelease(byteArray, addressOffset, value); } + /** + * Set the counter value atomically. + *

+ * The store has opaque memory semantics. + * + * @param value to be set + * @since 2.1.0 + */ + public void setOpaque(final long value) + { + UnsafeApi.putLongOpaque(byteArray, addressOffset, value); + } + /** * Set the counter with normal semantics. *

@@ -412,14 +493,14 @@ public long getAndAddOrdered(final long increment) } /** - * Adds an increment to the counter non atomically. + * Adds an increment to the counter non-atomically. *

* This method is not atomic; it can suffer from lost-updates due to race conditions. *

* The load has plain memory semantics and the store has release memory semantics. *

* The typical use-case is when there is one mutator thread, that calls this method, and one or more reader - * threads. + * threads. Typically, this method is combined with the {@link #getAcquire()} to read the value. * * @param increment to be added * @return the previous value of the counter @@ -435,6 +516,33 @@ public long getAndAddRelease(final long increment) return currentValue; } + /** + * Adds an increment to the counter non-atomically. + *

+ * This method is not atomic; it can suffer from lost-updates due to race conditions. + *

+ * The load has plain memory semantics and the store has opaque memory semantics. + *

+ * The typical use-case is when there is one mutator thread, that calls this method, and one or more reader + * threads. Typically, this method is combined with a {@link #getOpaque()} to read the value. + *

+ * If ordering of surrounding loads/stores isn't important, then this method is likely to be faster than + * {@link #getAndAddRelease(long)} because it has less strict memory ordering requirements. + * + * @param increment to be added + * @return the previous value of the counter + * @since 2.1.0 + */ + public long getAndAddOpaque(final long increment) + { + final byte[] array = byteArray; + final long offset = addressOffset; + final long currentValue = UnsafeApi.getLong(array, offset); + UnsafeApi.putLongOpaque(array, offset, currentValue + increment); + + return currentValue; + } + /** * Get the current value of a counter and atomically set it to a new value. * @@ -468,8 +576,32 @@ public long get() return UnsafeApi.getLongVolatile(byteArray, addressOffset); } + /** + * Get the value for the counter with acquire semantics. + * + * @return the value for the counter. + * @since 2.1.0 + */ + public long getAcquire() + { + return UnsafeApi.getLongAcquire(byteArray, addressOffset); + } + + /** + * Get the value for the counter with opaque semantics. + * + * @return the value for the counter. + * @since 2.1.0 + */ + public long getOpaque() + { + return UnsafeApi.getLongOpaque(byteArray, addressOffset); + } + /** * Get the value of the counter using weak ordering semantics. This is the same a standard read of a field. + *

+ * This call is identical to {@link #getPlain()} and that method is preferred. * * @return the value for the counter. */ @@ -551,6 +683,37 @@ public boolean proposeMaxRelease(final long proposedValue) return updated; } + /** + * Set the value to a new proposedValue if greater than the current value. + *

+ * This call is not atomic and can suffer from lost updates to race conditions. + *

+ * The load has plain memory semantics and the store has opaque memory semantics. + *

+ * The typical use-case is when there is one mutator thread, that calls this method, and one or more reader threads. + *

+ * This method is likely to outperform {@link #proposeMaxRelease(long)} since this method has less memory ordering + * requirements. + * + * @param proposedValue for the new max. + * @return true if a new max as been set otherwise false. + * @since 2.1.0 + */ + public boolean proposeMaxOpaque(final long proposedValue) + { + boolean updated = false; + + final byte[] array = byteArray; + final long offset = addressOffset; + if (UnsafeApi.getLong(array, offset) < proposedValue) + { + UnsafeApi.putLongOpaque(array, offset, proposedValue); + updated = true; + } + + return updated; + } + /** * {@inheritDoc} */