Skip to content

Commit

Permalink
Use android compatible methods on AtomicInteger
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg committed Nov 10, 2023
1 parent f8c9ffe commit 7121615
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,13 @@ Queue<AggregatorHandle<T, U>> getAggregatorHandlePool() {

@Override
public void recordLong(long value, Attributes attributes, Context context) {
// Obtain the AggregatorHolder, re-reading the volatile this.aggregatorHolder until we access
// one where recordsInProgress != -1. Collect sets recordsInProgress to -1 as a signal that
// AggregatorHolder is stale and is being replaced. Record operations increment recordInProgress
// and decrement when complete as a signal to Collect that record operations are active and must
// complete before its safe to collect.
AggregatorHolder<T, U> aggregatorHolder;
do {
aggregatorHolder = this.aggregatorHolder;
} while (aggregatorHolder.recordsInProgress.updateAndGet(
operand -> operand < 0 ? operand : operand + 1)
< 0);
AggregatorHolder<T, U> aggregatorHolder = getHolderForRecord();
try {
AggregatorHandle<T, U> handle =
getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
handle.recordLong(value, attributes, context);
} finally {
aggregatorHolder.recordsInProgress.decrementAndGet();
aggregatorHolder.recordsInProgress.addAndGet(-2);
}
}

Expand All @@ -115,26 +105,38 @@ public void recordDouble(double value, Attributes attributes, Context context) {
+ ". Dropping measurement.");
return;
}
// Obtain the AggregatorHolder, re-reading the volatile this.aggregatorHolder until we access
// one where recordsInProgress != -1. Collect sets recordsInProgress to -1 as a signal that
// AggregatorHolder is stale and is being replaced. Record operations increment recordInProgress
// and decrement when complete as a signal to Collect that record operations are active and must
// complete before its safe to collect.
AggregatorHolder<T, U> aggregatorHolder;
do {
aggregatorHolder = this.aggregatorHolder;
} while (aggregatorHolder.recordsInProgress.updateAndGet(
operand -> operand < 0 ? operand : operand + 1)
< 0);
AggregatorHolder<T, U> aggregatorHolder = getHolderForRecord();
try {
AggregatorHandle<T, U> handle =
getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
handle.recordDouble(value, attributes, context);
} finally {
aggregatorHolder.recordsInProgress.decrementAndGet();
aggregatorHolder.recordsInProgress.addAndGet(-2);
}
}

/**
* Obtain the AggregatorHolder for recording measurements, re-reading the volatile
* this.aggregatorHolder until we access one where recordsInProgress is even. Collect sets
* recordsInProgress to odd as a signal that AggregatorHolder is stale and is being replaced.
* Record operations increment recordInProgress by 2. Callers MUST decrement recordsInProgress by
* -2 when complete as a signal to Collect that record operations are active and must complete
* before its safe to collect.
*/
private AggregatorHolder<T, U> getHolderForRecord() {
do {
AggregatorHolder<T, U> aggregatorHolder = this.aggregatorHolder;
int recordsInProgress = aggregatorHolder.recordsInProgress.addAndGet(2);
if (recordsInProgress % 2 == 0) {
return aggregatorHolder;
} else {
// Collect is in progress, decrement recordsInProgress to allow collect to proceed and
// re-read aggregatorHolder
aggregatorHolder.recordsInProgress.addAndGet(-2);
}
} while (true);
}

private AggregatorHandle<T, U> getAggregatorHandle(
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles,
Attributes attributes,
Expand Down Expand Up @@ -185,11 +187,14 @@ public MetricData collect(
if (reset) {
AggregatorHolder<T, U> holder = this.aggregatorHolder;
this.aggregatorHolder = new AggregatorHolder<>();
// Set recordsInProgress to -1, which causes record operations to re-read the volatile
// this.aggregatorHolder
// Use compareAndSet in a loop to confirm we only set recordsInProgress if there aren't any
// records operations in progress.
while (!holder.recordsInProgress.compareAndSet(0, -1)) {}
// Increment recordsInProgress by 1, which produces an odd number acting as a signal that
// record operations should re-read the volatile this.aggregatorHolder.
// Repeatedly grab recordsInProgress until it is <= 1, which signals all active record
// operations are complete.
int recordsInProgress = holder.recordsInProgress.addAndGet(1);
while (recordsInProgress > 1) {
recordsInProgress = holder.recordsInProgress.get();
}
aggregatorHandles = holder.aggregatorHandles;
} else {
aggregatorHandles = this.aggregatorHolder.aggregatorHandles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,61 +459,4 @@ private static Stream<Arguments> concurrentStressTestArguments() {
(BiConsumer<Double, AtomicDouble>)
(value, cumulativeCount) -> cumulativeCount.set(value)));
}

@RepeatedTest(100)
void recordAndCollect_concurrentStressTestRepeated() {
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create()),
METRIC_DESCRIPTOR,
aggregator,
AttributesProcessor.noop(),
CARDINALITY_LIMIT);
BiConsumer<Double, AtomicDouble> collect =
(BiConsumer<Double, AtomicDouble>)
(value, cumulativeCount) -> cumulativeCount.addAndGet(value);

// Define record threads. Each records a value of 1.0, 2000 times
List<Thread> threads = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
Thread thread =
new Thread(
() -> {
for (int j = 0; j < 2000; j++) {
storage.recordDouble(1.0, Attributes.empty(), Context.current());
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1));
}
latch.countDown();
});
threads.add(thread);
}

// Define collect thread. Collect thread collects and aggregates the
AtomicDouble cumulativeSum = new AtomicDouble();
Thread collectThread =
new Thread(
() -> {
do {
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1));
MetricData metricData =
storage.collect(Resource.empty(), InstrumentationScopeInfo.empty(), 0, 1);
if (metricData.isEmpty()) {
continue;
}
metricData.getDoubleSumData().getPoints().stream()
.findFirst()
.ifPresent(pointData -> collect.accept(pointData.getValue(), cumulativeSum));
} while (latch.getCount() != 0);
});

// Start all the threads
collectThread.start();
threads.forEach(Thread::start);

// Wait for the collect thread to end, which collects until the record threads are done
Uninterruptibles.joinUninterruptibly(collectThread);

assertThat(cumulativeSum.get()).isEqualTo(8000.0);
}
}

0 comments on commit 7121615

Please sign in to comment.