From 153ed4582ccb92f5061d21a48eebf2cca12ea0ae Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Thu, 19 Oct 2023 16:34:17 -0500 Subject: [PATCH 1/5] Fix delta metric storage concurrency bug --- .../io/opentelemetry/sdk/metrics/TestSdk.java | 15 +++- .../DefaultSynchronousMetricStorage.java | 36 ++++++-- .../state/SynchronousMetricStorageTest.java | 88 +++++++++++++++++++ 3 files changed, 132 insertions(+), 7 deletions(-) diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/TestSdk.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/TestSdk.java index 4f461d6fa63..1c8e760425d 100644 --- a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/TestSdk.java +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/TestSdk.java @@ -38,7 +38,7 @@ Meter build() { .get("io.opentelemetry.sdk.metrics"); } }), - SDK( + SDK_CUMULATIVE( new SdkBuilder() { @Override Meter build() { @@ -50,6 +50,19 @@ Meter build() { .build() .get("io.opentelemetry.sdk.metrics"); } + }), + SDK_DELTA( + new SdkBuilder() { + @Override + Meter build() { + return SdkMeterProvider.builder() + .setClock(Clock.getDefault()) + .setResource(Resource.empty()) + // Must register reader for real SDK. + .registerMetricReader(InMemoryMetricReader.createDelta()) + .build() + .get("io.opentelemetry.sdk.metrics"); + } }); private final SdkBuilder sdkBuilder; diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 3a359d75d23..cf43245637f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -26,6 +26,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.StampedLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -46,7 +47,8 @@ public final class DefaultSynchronousMetricStorage aggregator; - private final ConcurrentHashMap> aggregatorHandles = + private final StampedLock sl = new StampedLock(); + private ConcurrentHashMap> aggregatorHandles = new ConcurrentHashMap<>(); private final AttributesProcessor attributesProcessor; @@ -83,8 +85,13 @@ Queue> getAggregatorHandlePool() { @Override public void recordLong(long value, Attributes attributes, Context context) { - AggregatorHandle handle = getAggregatorHandle(attributes, context); - handle.recordLong(value, attributes, context); + long stamp = sl.readLock(); + try { + AggregatorHandle handle = getAggregatorHandle(attributes, context); + handle.recordLong(value, attributes, context); + } finally { + sl.unlockRead(stamp); + } } @Override @@ -99,8 +106,13 @@ public void recordDouble(double value, Attributes attributes, Context context) { + ". Dropping measurement."); return; } - AggregatorHandle handle = getAggregatorHandle(attributes, context); - handle.recordDouble(value, attributes, context); + long stamp = sl.readLock(); + try { + AggregatorHandle handle = getAggregatorHandle(attributes, context); + handle.recordDouble(value, attributes, context); + } finally { + sl.unlockRead(stamp); + } } private AggregatorHandle getAggregatorHandle(Attributes attributes, Context context) { @@ -146,13 +158,25 @@ public MetricData collect( ? registeredReader.getLastCollectEpochNanos() : startEpochNanos; + ConcurrentHashMap> aggregatorHandles; + if (reset) { + long stamp = sl.writeLock(); + try { + aggregatorHandles = this.aggregatorHandles; + this.aggregatorHandles = new ConcurrentHashMap<>(); + } finally { + sl.unlockWrite(stamp); + } + } else { + aggregatorHandles = this.aggregatorHandles; + } + // Grab aggregated points. List points = new ArrayList<>(aggregatorHandles.size()); aggregatorHandles.forEach( (attributes, handle) -> { T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset); if (reset) { - aggregatorHandles.remove(attributes, handle); // Return the aggregator to the pool. aggregatorHandlePool.offer(handle); } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index aa6fe1d3999..2d301eb6d88 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -12,6 +12,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.google.common.util.concurrent.AtomicDouble; +import com.google.common.util.concurrent.Uninterruptibles; import io.github.netmikey.logunit.api.LogCapturer; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; @@ -21,9 +23,11 @@ import io.opentelemetry.sdk.metrics.Aggregation; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.InstrumentValueType; +import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; @@ -37,8 +41,17 @@ import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.function.BiConsumer; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.event.Level; @SuppressLogger(DefaultSynchronousMetricStorage.class) @@ -370,4 +383,79 @@ void recordAndCollect_DeltaAtLimit() { assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT); logs.assertContains("Instrument name has exceeded the maximum allowed cardinality"); } + + @ParameterizedTest + @MethodSource("concurrentStressTestArguments") + void recordAndCollect_concurrentStressTest( + DefaultSynchronousMetricStorage storage, BiConsumer collect) { + // Define record threads. Each records a value of 1.0, 2000 times + List threads = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(4); + for (int i = 0; i < 4; i++) { + Thread thread = + new Thread( + () -> { + for (int j = 0; j < 2000; j++) { + storage.recordDouble(1.0, Attributes.empty(), Context.current()); + Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1)); + } + latch.countDown(); + }); + threads.add(thread); + } + + // Define collect thread. Collect thread collects and aggregates the + AtomicDouble cumulativeSum = new AtomicDouble(); + Thread collectThread = + new Thread( + () -> { + do { + Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1)); + MetricData metricData = + storage.collect(Resource.empty(), InstrumentationScopeInfo.empty(), 0, 1); + if (metricData.isEmpty()) { + continue; + } + metricData.getDoubleSumData().getPoints().stream() + .findFirst() + .ifPresent(pointData -> collect.accept(pointData.getValue(), cumulativeSum)); + } while (latch.getCount() != 0); + }); + + // Start all the threads + collectThread.start(); + threads.forEach(Thread::start); + + // Wait for the collect thread to end, which collects until the record threads are done + Uninterruptibles.joinUninterruptibly(collectThread); + + assertThat(cumulativeSum.get()).isEqualTo(8000.0); + } + + private static Stream concurrentStressTestArguments() { + Aggregator aggregator = + ((AggregatorFactory) Aggregation.sum()) + .createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff()); + return Stream.of( + Arguments.of( + // Delta + new DefaultSynchronousMetricStorage<>( + RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create()), + METRIC_DESCRIPTOR, + aggregator, + AttributesProcessor.noop(), + CARDINALITY_LIMIT), + (BiConsumer) + (value, cumulativeCount) -> cumulativeCount.addAndGet(value)), + Arguments.of( + // Cumulative + new DefaultSynchronousMetricStorage<>( + RegisteredReader.create(InMemoryMetricReader.create(), ViewRegistry.create()), + METRIC_DESCRIPTOR, + aggregator, + AttributesProcessor.noop(), + CARDINALITY_LIMIT), + (BiConsumer) + (value, cumulativeCount) -> cumulativeCount.set(value))); + } } From 8926439e800ed9f07c2c91caa87b44aae6149bfb Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Mon, 23 Oct 2023 12:55:31 -0500 Subject: [PATCH 2/5] Use ReentrantReadWriteLock --- .../DefaultSynchronousMetricStorage.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index cf43245637f..eae1b6ce856 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -26,7 +26,9 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.StampedLock; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -47,7 +49,9 @@ public final class DefaultSynchronousMetricStorage aggregator; - private final StampedLock sl = new StampedLock(); + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); private ConcurrentHashMap> aggregatorHandles = new ConcurrentHashMap<>(); private final AttributesProcessor attributesProcessor; @@ -85,12 +89,12 @@ Queue> getAggregatorHandlePool() { @Override public void recordLong(long value, Attributes attributes, Context context) { - long stamp = sl.readLock(); + readLock.lock(); try { AggregatorHandle handle = getAggregatorHandle(attributes, context); handle.recordLong(value, attributes, context); } finally { - sl.unlockRead(stamp); + readLock.unlock(); } } @@ -106,12 +110,12 @@ public void recordDouble(double value, Attributes attributes, Context context) { + ". Dropping measurement."); return; } - long stamp = sl.readLock(); + readLock.lock(); try { AggregatorHandle handle = getAggregatorHandle(attributes, context); handle.recordDouble(value, attributes, context); } finally { - sl.unlockRead(stamp); + readLock.unlock(); } } @@ -160,12 +164,12 @@ public MetricData collect( ConcurrentHashMap> aggregatorHandles; if (reset) { - long stamp = sl.writeLock(); + writeLock.lock(); try { aggregatorHandles = this.aggregatorHandles; this.aggregatorHandles = new ConcurrentHashMap<>(); } finally { - sl.unlockWrite(stamp); + writeLock.unlock(); } } else { aggregatorHandles = this.aggregatorHandles; From e998b729c60361da2b5bfdf09d2d897ddc896a5e Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Thu, 9 Nov 2023 10:43:48 -0600 Subject: [PATCH 3/5] Always read from volatile but never block --- .../DefaultSynchronousMetricStorage.java | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index eae1b6ce856..0a837018483 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -49,11 +49,7 @@ public final class DefaultSynchronousMetricStorage aggregator; - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock readLock = readWriteLock.readLock(); - private final Lock writeLock = readWriteLock.writeLock(); - private ConcurrentHashMap> aggregatorHandles = - new ConcurrentHashMap<>(); + private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>(); private final AttributesProcessor attributesProcessor; /** @@ -89,9 +85,11 @@ Queue> getAggregatorHandlePool() { @Override public void recordLong(long value, Attributes attributes, Context context) { + Lock readLock = aggregatorHolder.lock.readLock(); readLock.lock(); try { - AggregatorHandle handle = getAggregatorHandle(attributes, context); + AggregatorHandle handle = + getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); handle.recordLong(value, attributes, context); } finally { readLock.unlock(); @@ -110,16 +108,21 @@ public void recordDouble(double value, Attributes attributes, Context context) { + ". Dropping measurement."); return; } + Lock readLock = aggregatorHolder.lock.readLock(); readLock.lock(); try { - AggregatorHandle handle = getAggregatorHandle(attributes, context); + AggregatorHandle handle = + getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); handle.recordDouble(value, attributes, context); } finally { readLock.unlock(); } } - private AggregatorHandle getAggregatorHandle(Attributes attributes, Context context) { + private AggregatorHandle getAggregatorHandle( + ConcurrentHashMap> aggregatorHandles, + Attributes attributes, + Context context) { Objects.requireNonNull(attributes, "attributes"); attributes = attributesProcessor.process(attributes, context); AggregatorHandle handle = aggregatorHandles.get(attributes); @@ -164,15 +167,17 @@ public MetricData collect( ConcurrentHashMap> aggregatorHandles; if (reset) { + AggregatorHolder holder = this.aggregatorHolder; + this.aggregatorHolder = new AggregatorHolder<>(); + Lock writeLock = holder.lock.writeLock(); writeLock.lock(); try { - aggregatorHandles = this.aggregatorHandles; - this.aggregatorHandles = new ConcurrentHashMap<>(); + aggregatorHandles = holder.aggregatorHandles; } finally { writeLock.unlock(); } } else { - aggregatorHandles = this.aggregatorHandles; + aggregatorHandles = this.aggregatorHolder.aggregatorHandles; } // Grab aggregated points. @@ -208,4 +213,10 @@ public MetricData collect( public MetricDescriptor getMetricDescriptor() { return metricDescriptor; } + + private static class AggregatorHolder { + private final ConcurrentHashMap> aggregatorHandles = + new ConcurrentHashMap<>(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + } } From 60c2b4347b35d4bef51fe53d2aa0c87daff9eea3 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Fri, 10 Nov 2023 08:09:19 -0600 Subject: [PATCH 4/5] No volatile variables --- .../state/DefaultSynchronousMetricStorage.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 0a837018483..86740ee6bbb 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -26,6 +26,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -49,7 +50,9 @@ public final class DefaultSynchronousMetricStorage aggregator; - private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>(); + private final AtomicInteger collectCount = new AtomicInteger(); + private final AggregatorHolder holder1 = new AggregatorHolder<>(); + private final AggregatorHolder holder2 = new AggregatorHolder<>(); private final AttributesProcessor attributesProcessor; /** @@ -85,6 +88,7 @@ Queue> getAggregatorHandlePool() { @Override public void recordLong(long value, Attributes attributes, Context context) { + AggregatorHolder aggregatorHolder = collectCount.get() % 2 == 0 ? holder1 : holder2; Lock readLock = aggregatorHolder.lock.readLock(); readLock.lock(); try { @@ -108,6 +112,7 @@ public void recordDouble(double value, Attributes attributes, Context context) { + ". Dropping measurement."); return; } + AggregatorHolder aggregatorHolder = collectCount.get() % 2 == 0 ? holder1 : holder2; Lock readLock = aggregatorHolder.lock.readLock(); readLock.lock(); try { @@ -167,8 +172,7 @@ public MetricData collect( ConcurrentHashMap> aggregatorHandles; if (reset) { - AggregatorHolder holder = this.aggregatorHolder; - this.aggregatorHolder = new AggregatorHolder<>(); + AggregatorHolder holder = collectCount.getAndIncrement() % 2 == 0 ? holder1 : holder2; Lock writeLock = holder.lock.writeLock(); writeLock.lock(); try { @@ -177,7 +181,7 @@ public MetricData collect( writeLock.unlock(); } } else { - aggregatorHandles = this.aggregatorHolder.aggregatorHandles; + aggregatorHandles = holder1.aggregatorHandles; } // Grab aggregated points. @@ -201,6 +205,10 @@ public MetricData collect( aggregatorHandlePool.poll(); } + if (reset) { + aggregatorHandles.clear(); + } + if (points.isEmpty()) { return EmptyMetricData.getInstance(); } From 945def74f82c067c3daa7cb71f54a6b7ab6bfeb4 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Fri, 10 Nov 2023 08:13:33 -0600 Subject: [PATCH 5/5] Revert "No volatile variables" This reverts commit 60c2b4347b35d4bef51fe53d2aa0c87daff9eea3. --- .../state/DefaultSynchronousMetricStorage.java | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 86740ee6bbb..0a837018483 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -26,7 +26,6 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -50,9 +49,7 @@ public final class DefaultSynchronousMetricStorage aggregator; - private final AtomicInteger collectCount = new AtomicInteger(); - private final AggregatorHolder holder1 = new AggregatorHolder<>(); - private final AggregatorHolder holder2 = new AggregatorHolder<>(); + private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>(); private final AttributesProcessor attributesProcessor; /** @@ -88,7 +85,6 @@ Queue> getAggregatorHandlePool() { @Override public void recordLong(long value, Attributes attributes, Context context) { - AggregatorHolder aggregatorHolder = collectCount.get() % 2 == 0 ? holder1 : holder2; Lock readLock = aggregatorHolder.lock.readLock(); readLock.lock(); try { @@ -112,7 +108,6 @@ public void recordDouble(double value, Attributes attributes, Context context) { + ". Dropping measurement."); return; } - AggregatorHolder aggregatorHolder = collectCount.get() % 2 == 0 ? holder1 : holder2; Lock readLock = aggregatorHolder.lock.readLock(); readLock.lock(); try { @@ -172,7 +167,8 @@ public MetricData collect( ConcurrentHashMap> aggregatorHandles; if (reset) { - AggregatorHolder holder = collectCount.getAndIncrement() % 2 == 0 ? holder1 : holder2; + AggregatorHolder holder = this.aggregatorHolder; + this.aggregatorHolder = new AggregatorHolder<>(); Lock writeLock = holder.lock.writeLock(); writeLock.lock(); try { @@ -181,7 +177,7 @@ public MetricData collect( writeLock.unlock(); } } else { - aggregatorHandles = holder1.aggregatorHandles; + aggregatorHandles = this.aggregatorHolder.aggregatorHandles; } // Grab aggregated points. @@ -205,10 +201,6 @@ public MetricData collect( aggregatorHandlePool.poll(); } - if (reset) { - aggregatorHandles.clear(); - } - if (points.isEmpty()) { return EmptyMetricData.getInstance(); }