Skip to content

Commit

Permalink
fix: abstract the resource a batch is using
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jun 22, 2023
1 parent 8cbea70 commit 5a7a290
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 31 deletions.
6 changes: 6 additions & 0 deletions gax-java/gax/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@
<method>*setWaitTimeout*</method>
<to>com.google.api.gax.rpc.ServerStreamingCallSettings$Builder</to>
</difference>
<!-- BatchingDescriptor is an InternalApi intended for google cloud libraries use only -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/api/gax/batching/BatchingDescriptor</className>
<method>*</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2023 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.api.core.InternalApi;

/**
* Represent the resource used by a batch including element and byte. It can also be extended to
* other things to determine if adding a new element needs to be flow controlled or if the current
* batch needs to be flushed.
*/
@InternalApi("For google-cloud-java client use only.")
public interface BatchResource {

/** Adds the additional resource. */
BatchResource add(BatchResource resource);

/** Returns the element count of this resource. */
long getElementCount();

/** Returns the byte count of this resource. */
long getByteCount();

/** Returns true if the resource is empty. */
boolean isEmpty();
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final FlowController flowController;
private final ApiCallContext callContext;

private final long elementThreshold;

private final long bytesThreshold;

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response
Expand Down Expand Up @@ -192,7 +196,7 @@ public BatcherImpl(
+ "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold");
}
this.flowController = flowController;
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats);
if (batchingSettings.getDelayThreshold() != null) {
long delay = batchingSettings.getDelayThreshold().toMillis();
PushCurrentBatchRunnable<ElementT, ElementResultT, RequestT, ResponseT> runnable =
Expand All @@ -204,6 +208,11 @@ public BatcherImpl(
}
currentBatcherReference = new BatcherReference(this);
this.callContext = callContext;

Long elementCountThreshold = batchingSettings.getElementCountThreshold();
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
}

/** {@inheritDoc} */
Expand All @@ -213,7 +222,7 @@ public ApiFuture<ElementResultT> add(ElementT element) {
// will only be done from a single calling thread.
Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher");

long bytesSize = batchingDescriptor.countBytes(element);
BatchResource newResource = batchingDescriptor.createResource(element);

// This is not the optimal way of throttling. It does not send out partial batches, which
// means that the Batcher might not use up all the resources allowed by FlowController.
Expand All @@ -232,7 +241,7 @@ public ApiFuture<ElementResultT> add(ElementT element) {
// defer it till we decide on if refactoring FlowController is necessary.
Stopwatch stopwatch = Stopwatch.createStarted();
try {
flowController.reserve(1, bytesSize);
flowController.reserve(newResource.getElementCount(), newResource.getByteCount());
} catch (FlowControlException e) {
// This exception will only be thrown if the FlowController is set to ThrowException behavior
throw FlowControlRuntimeException.fromFlowControlException(e);
Expand All @@ -241,12 +250,15 @@ public ApiFuture<ElementResultT> add(ElementT element) {

SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
synchronized (elementLock) {
currentOpenBatch.add(element, result, throttledTimeMs);
}
if (!currentOpenBatch.isEmpty()
&& batchingDescriptor.shouldFlush(
currentOpenBatch.resource.add(newResource), elementThreshold, bytesThreshold)) {
sendOutstanding();
}

if (currentOpenBatch.hasAnyThresholdReached()) {
sendOutstanding();
currentOpenBatch.add(element, newResource, result, throttledTimeMs);
}

return result;
}

Expand All @@ -267,7 +279,7 @@ public void sendOutstanding() {
return;
}
accumulatedBatch = currentOpenBatch;
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batcherStats);
}

// This check is for old clients that instantiated the batcher without ApiCallContext
Expand All @@ -291,7 +303,9 @@ public void sendOutstanding() {
@Override
public void onSuccess(ResponseT response) {
try {
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
flowController.release(
accumulatedBatch.resource.getElementCount(),
accumulatedBatch.resource.getByteCount());
accumulatedBatch.onBatchSuccess(response);
} finally {
onBatchCompletion();
Expand All @@ -301,7 +315,9 @@ public void onSuccess(ResponseT response) {
@Override
public void onFailure(Throwable throwable) {
try {
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
flowController.release(
accumulatedBatch.resource.getElementCount(),
accumulatedBatch.resource.getByteCount());
accumulatedBatch.onBatchFailure(throwable);
} finally {
onBatchCompletion();
Expand Down Expand Up @@ -412,35 +428,31 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private final BatchingRequestBuilder<ElementT, RequestT> builder;
private final List<BatchEntry<ElementT, ElementResultT>> entries;
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
private final BatcherStats batcherStats;
private final long elementThreshold;
private final long bytesThreshold;

private long elementCounter = 0;
private long byteCounter = 0;
private final BatcherStats batcherStats;
private long totalThrottledTimeMs = 0;
private BatchResource resource;

private Batch(
RequestT prototype,
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
BatchingSettings batchingSettings,
BatcherStats batcherStats) {
this.descriptor = descriptor;
this.builder = descriptor.newRequestBuilder(prototype);
this.entries = new ArrayList<>();
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
this.batcherStats = batcherStats;
this.resource = descriptor.createEmptyResource();
}

void add(ElementT element, SettableApiFuture<ElementResultT> result, long throttledTimeMs) {
void add(
ElementT element,
BatchResource newResource,
SettableApiFuture<ElementResultT> result,
long throttledTimeMs) {
builder.add(element);
entries.add(BatchEntry.create(element, result));
elementCounter++;
byteCounter += descriptor.countBytes(element);
totalThrottledTimeMs += throttledTimeMs;
resource = resource.add(newResource);
}

void onBatchSuccess(ResponseT response) {
Expand All @@ -464,11 +476,7 @@ void onBatchFailure(Throwable throwable) {
}

boolean isEmpty() {
return elementCounter == 0;
}

boolean hasAnyThresholdReached() {
return elementCounter >= elementThreshold || byteCounter >= bytesThreshold;
return resource.isEmpty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,24 @@ public interface BatchingDescriptor<ElementT, ElementResultT, RequestT, Response

/** Returns the size of the passed element object in bytes. */
long countBytes(ElementT element);

/** Creates a new {@link BatchResource} with ElementT. */
default BatchResource createResource(ElementT element) {
return new DefaultBatchResource(1, countBytes(element));
}

/** Create an empty {@link BatchResource}. */
default BatchResource createEmptyResource() {
return new DefaultBatchResource(0, 0);
}

/**
* Checks if the current {@link BatchResource} should be flushed based on the maxElementThreshold
* and maxBytesThreshold.
*/
default boolean shouldFlush(
BatchResource resource, long maxElementThreshold, long maxBytesThreshold) {
return resource.getElementCount() > maxElementThreshold
|| resource.getByteCount() > maxBytesThreshold;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2023 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.common.base.Preconditions;

/**
* The default implementation of {@link BatchResource} which tracks the elementCount and byteCount.
*/
final class DefaultBatchResource implements BatchResource {

private long elementCount;
private long byteCount;

DefaultBatchResource(long elementCount, long byteCount) {
this.elementCount = elementCount;
this.byteCount = byteCount;
}

@Override
public BatchResource add(BatchResource resource) {
Preconditions.checkArgument(
resource instanceof DefaultBatchResource,
"BatchResource needs to be an instance of DefaultBatchResource");
this.elementCount += ((DefaultBatchResource) resource).elementCount;
this.byteCount += ((DefaultBatchResource) resource).byteCount;
return this;
}

@Override
public long getElementCount() {
return elementCount;
}

@Override
public long getByteCount() {
return byteCount;
}

@Override
public boolean isEmpty() {
return elementCount == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class BatcherImplTest {
BatchingSettings.newBuilder()
.setElementCountThreshold(1000L)
.setRequestByteThreshold(1000L)
.setDelayThreshold(Duration.ofSeconds(1))
.setDelayThreshold(Duration.ofSeconds(1000))
.build();

@After
Expand Down Expand Up @@ -376,6 +376,7 @@ public void testWhenThresholdIsDisabled() throws Exception {
.build();
underTest = createDefaultBatcherImpl(settings, null);
Future<Integer> result = underTest.add(2);
underTest.add(3);
assertThat(result.isDone()).isTrue();
assertThat(result.get()).isEqualTo(4);
}
Expand Down Expand Up @@ -895,7 +896,7 @@ public void run() {

// Mockito recommends using verify() as the ONLY way to interact with Argument
// captors - otherwise it may incur in unexpected behaviour
Mockito.verify(callContext).withOption(key.capture(), value.capture());
Mockito.verify(callContext, Mockito.timeout(100)).withOption(key.capture(), value.capture());

// Verify that throttled time is recorded in ApiCallContext
assertThat(key.getValue()).isSameInstanceAs(Batcher.THROTTLED_TIME_KEY);
Expand Down Expand Up @@ -1012,8 +1013,9 @@ private void testElementTriggers(BatchingSettings settings) throws Exception {
underTest = createDefaultBatcherImpl(settings, null);
Future<Integer> result = underTest.add(4);
assertThat(result.isDone()).isFalse();
// After this element is added, the batch triggers sendOutstanding().
Future<Integer> anotherResult = underTest.add(5);
// After this element is added, the batch triggers sendOutstanding().
underTest.add(6);
// Both the elements should be resolved now.
assertThat(result.isDone()).isTrue();
assertThat(result.get()).isEqualTo(16);
Expand Down

0 comments on commit 5a7a290

Please sign in to comment.