Skip to content

Commit

Permalink
Stop using updateAndGet for android compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg committed Nov 10, 2023
1 parent f8c9ffe commit 5452481
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,14 @@ Queue<AggregatorHandle<T, U>> getAggregatorHandlePool() {
@Override
public void recordLong(long value, Attributes attributes, Context context) {
// Obtain the AggregatorHolder, re-reading the volatile this.aggregatorHolder until we access
// one where recordsInProgress != -1. Collect sets recordsInProgress to -1 as a signal that
// one where recordsInProgress is odd. Collect sets recordsInProgress to -1 as a signal that
// AggregatorHolder is stale and is being replaced. Record operations increment recordInProgress
// and decrement when complete as a signal to Collect that record operations are active and must
// complete before its safe to collect.
// and decrement by multiples of 2 when complete as a signal to Collect that record operations
// are active and must complete before its safe to collect.
AggregatorHolder<T, U> aggregatorHolder;
do {
aggregatorHolder = this.aggregatorHolder;
} while (aggregatorHolder.recordsInProgress.updateAndGet(
operand -> operand < 0 ? operand : operand + 1)
< 0);
} while (aggregatorHolder.recordsInProgress.addAndGet(2) % 2 != 0);
try {
AggregatorHandle<T, U> handle =
getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
Expand All @@ -116,22 +114,20 @@ public void recordDouble(double value, Attributes attributes, Context context) {
return;
}
// Obtain the AggregatorHolder, re-reading the volatile this.aggregatorHolder until we access
// one where recordsInProgress != -1. Collect sets recordsInProgress to -1 as a signal that
// one where recordsInProgress is odd. Collect sets recordsInProgress to -1 as a signal that
// AggregatorHolder is stale and is being replaced. Record operations increment recordInProgress
// and decrement when complete as a signal to Collect that record operations are active and must
// complete before its safe to collect.
// and decrement by multiples of 2 when complete as a signal to Collect that record operations
// are active and must complete before its safe to collect.
AggregatorHolder<T, U> aggregatorHolder;
do {
aggregatorHolder = this.aggregatorHolder;
} while (aggregatorHolder.recordsInProgress.updateAndGet(
operand -> operand < 0 ? operand : operand + 1)
< 0);
} while (aggregatorHolder.recordsInProgress.addAndGet(2) % 2 != 0);
try {
AggregatorHandle<T, U> handle =
getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
handle.recordDouble(value, attributes, context);
} finally {
aggregatorHolder.recordsInProgress.decrementAndGet();
aggregatorHolder.recordsInProgress.addAndGet(-2);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,61 +459,4 @@ private static Stream<Arguments> concurrentStressTestArguments() {
(BiConsumer<Double, AtomicDouble>)
(value, cumulativeCount) -> cumulativeCount.set(value)));
}

@RepeatedTest(100)
void recordAndCollect_concurrentStressTestRepeated() {
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create()),
METRIC_DESCRIPTOR,
aggregator,
AttributesProcessor.noop(),
CARDINALITY_LIMIT);
BiConsumer<Double, AtomicDouble> collect =
(BiConsumer<Double, AtomicDouble>)
(value, cumulativeCount) -> cumulativeCount.addAndGet(value);

// Define record threads. Each records a value of 1.0, 2000 times
List<Thread> threads = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
Thread thread =
new Thread(
() -> {
for (int j = 0; j < 2000; j++) {
storage.recordDouble(1.0, Attributes.empty(), Context.current());
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1));
}
latch.countDown();
});
threads.add(thread);
}

// Define collect thread. Collect thread collects and aggregates the
AtomicDouble cumulativeSum = new AtomicDouble();
Thread collectThread =
new Thread(
() -> {
do {
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1));
MetricData metricData =
storage.collect(Resource.empty(), InstrumentationScopeInfo.empty(), 0, 1);
if (metricData.isEmpty()) {
continue;
}
metricData.getDoubleSumData().getPoints().stream()
.findFirst()
.ifPresent(pointData -> collect.accept(pointData.getValue(), cumulativeSum));
} while (latch.getCount() != 0);
});

// Start all the threads
collectThread.start();
threads.forEach(Thread::start);

// Wait for the collect thread to end, which collects until the record threads are done
Uninterruptibles.joinUninterruptibly(collectThread);

assertThat(cumulativeSum.get()).isEqualTo(8000.0);
}
}

0 comments on commit 5452481

Please sign in to comment.