From 545248104ba876fc14a0e684474c25f625e66356 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Fri, 10 Nov 2023 10:54:20 -0600 Subject: [PATCH] Stop using updateAndGet for android compatibility --- .../DefaultSynchronousMetricStorage.java | 22 +++---- .../state/SynchronousMetricStorageTest.java | 57 ------------------- 2 files changed, 9 insertions(+), 70 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 577024a05dd..d588eb2a016 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -84,16 +84,14 @@ Queue> getAggregatorHandlePool() { @Override public void recordLong(long value, Attributes attributes, Context context) { // Obtain the AggregatorHolder, re-reading the volatile this.aggregatorHolder until we access - // one where recordsInProgress != -1. Collect sets recordsInProgress to -1 as a signal that + // one where recordsInProgress is odd. Collect sets recordsInProgress to -1 as a signal that // AggregatorHolder is stale and is being replaced. Record operations increment recordInProgress - // and decrement when complete as a signal to Collect that record operations are active and must - // complete before its safe to collect. + // and decrement by multiples of 2 when complete as a signal to Collect that record operations + // are active and must complete before its safe to collect. AggregatorHolder aggregatorHolder; do { aggregatorHolder = this.aggregatorHolder; - } while (aggregatorHolder.recordsInProgress.updateAndGet( - operand -> operand < 0 ? operand : operand + 1) - < 0); + } while (aggregatorHolder.recordsInProgress.addAndGet(2) % 2 != 0); try { AggregatorHandle handle = getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); @@ -116,22 +114,20 @@ public void recordDouble(double value, Attributes attributes, Context context) { return; } // Obtain the AggregatorHolder, re-reading the volatile this.aggregatorHolder until we access - // one where recordsInProgress != -1. Collect sets recordsInProgress to -1 as a signal that + // one where recordsInProgress is odd. Collect sets recordsInProgress to -1 as a signal that // AggregatorHolder is stale and is being replaced. Record operations increment recordInProgress - // and decrement when complete as a signal to Collect that record operations are active and must - // complete before its safe to collect. + // and decrement by multiples of 2 when complete as a signal to Collect that record operations + // are active and must complete before its safe to collect. AggregatorHolder aggregatorHolder; do { aggregatorHolder = this.aggregatorHolder; - } while (aggregatorHolder.recordsInProgress.updateAndGet( - operand -> operand < 0 ? operand : operand + 1) - < 0); + } while (aggregatorHolder.recordsInProgress.addAndGet(2) % 2 != 0); try { AggregatorHandle handle = getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); handle.recordDouble(value, attributes, context); } finally { - aggregatorHolder.recordsInProgress.decrementAndGet(); + aggregatorHolder.recordsInProgress.addAndGet(-2); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index 9ce7682b279..0563f5f6a12 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -459,61 +459,4 @@ private static Stream concurrentStressTestArguments() { (BiConsumer) (value, cumulativeCount) -> cumulativeCount.set(value))); } - - @RepeatedTest(100) - void recordAndCollect_concurrentStressTestRepeated() { - DefaultSynchronousMetricStorage storage = - new DefaultSynchronousMetricStorage<>( - RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create()), - METRIC_DESCRIPTOR, - aggregator, - AttributesProcessor.noop(), - CARDINALITY_LIMIT); - BiConsumer collect = - (BiConsumer) - (value, cumulativeCount) -> cumulativeCount.addAndGet(value); - - // Define record threads. Each records a value of 1.0, 2000 times - List threads = new ArrayList<>(); - CountDownLatch latch = new CountDownLatch(4); - for (int i = 0; i < 4; i++) { - Thread thread = - new Thread( - () -> { - for (int j = 0; j < 2000; j++) { - storage.recordDouble(1.0, Attributes.empty(), Context.current()); - Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1)); - } - latch.countDown(); - }); - threads.add(thread); - } - - // Define collect thread. Collect thread collects and aggregates the - AtomicDouble cumulativeSum = new AtomicDouble(); - Thread collectThread = - new Thread( - () -> { - do { - Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1)); - MetricData metricData = - storage.collect(Resource.empty(), InstrumentationScopeInfo.empty(), 0, 1); - if (metricData.isEmpty()) { - continue; - } - metricData.getDoubleSumData().getPoints().stream() - .findFirst() - .ifPresent(pointData -> collect.accept(pointData.getValue(), cumulativeSum)); - } while (latch.getCount() != 0); - }); - - // Start all the threads - collectThread.start(); - threads.forEach(Thread::start); - - // Wait for the collect thread to end, which collects until the record threads are done - Uninterruptibles.joinUninterruptibly(collectThread); - - assertThat(cumulativeSum.get()).isEqualTo(8000.0); - } }