Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: count row merging errors as internal errors #2045

Merged
merged 3 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: count row merging errors as internal errors
Currently they dont have a status associated and thus get counted as UNKOWN

Change-Id: Ida3470a0609f2e2ad51534eb3141db394af1dcdc
  • Loading branch information
igorbernstein2 committed Jan 9, 2024
commit 40573abe05b5c3eeab9560c13fa21e86a2765a13
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InternalException;
import com.google.bigtable.v2.ReadRowsResponse.CellChunk;
import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
import com.google.cloud.bigtable.data.v2.models.RowAdapter.RowBuilder;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.EvictingQueue;
import com.google.protobuf.ByteString;
import io.grpc.Status;

import java.util.List;

/**
Expand Down Expand Up @@ -252,6 +257,16 @@ State handleChunk(CellChunk chunk) {
new State() {
@Override
State handleLastScannedRow(ByteString rowKey) {
if (lastCompleteRowKey != null) {
int cmp = ByteStringComparator.INSTANCE.compare(lastCompleteRowKey, rowKey);
String direction = "increasing";
if (reversed) {
cmp *= -1;
direction = "decreasing";
}

validate(cmp < 0, "AWAITING_NEW_ROW: key must be strictly " + direction);
}
completeRow = adapter.createScanMarkerRow(rowKey);
lastCompleteRowKey = rowKey;
return AWAITING_ROW_CONSUME;
Expand Down Expand Up @@ -468,9 +483,9 @@ private void validate(boolean condition, String message) {
}
}

static class InvalidInputException extends RuntimeException {
static class InvalidInputException extends InternalException {
InvalidInputException(String message) {
super(message);
super(message, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.google.cloud.bigtable.data.v2.functional;

import com.google.api.gax.rpc.InternalException;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.StringValue;
import io.grpc.Server;
import io.grpc.stub.StreamObserver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@RunWith(JUnit4.class)
public class ReadRowsTest {
private FakeService service;
private Server server;

@Before
public void setUp() throws Exception {
service = new FakeService();
server = FakeServiceBuilder.create(service)
.start();
}

@After
public void tearDown() throws Exception {
server.shutdown();
}

@Test
public void rowMergingErrorsUseInternalStatus() throws Exception {
BigtableDataSettings settings = BigtableDataSettings.newBuilderForEmulator(server.getPort())
.setProjectId("fake-project")
.setInstanceId("fake-instance")
.build();

service.readRowsResponses.add(
ReadRowsResponse.newBuilder()
.addChunks(
ReadRowsResponse.CellChunk.newBuilder()
.setRowKey(ByteString.copyFromUtf8("z"))
.setFamilyName(StringValue.newBuilder().setValue("f"))
.setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")).build())
.setTimestampMicros(1000)
.setValue(ByteString.copyFromUtf8("v"))
.setCommitRow(true)
)
.addChunks(
ReadRowsResponse.CellChunk.newBuilder()
.setRowKey(ByteString.copyFromUtf8("a"))
.setFamilyName(StringValue.newBuilder().setValue("f"))
.setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")).build())
.setTimestampMicros(1000)
.setValue(ByteString.copyFromUtf8("v"))
.setCommitRow(true)
)
.build()
);

try (BigtableDataClient client = BigtableDataClient.create(settings)) {
Assert.assertThrows(
InternalException.class,
() -> {
for (Row ignored : client.readRows(Query.create("fake-table"))) {

}
}
);
}
}


static class FakeService extends BigtableGrpc.BigtableImplBase {
private List<ReadRowsResponse> readRowsResponses = Collections.synchronizedList(new ArrayList<>());

@Override
public void readRows(ReadRowsRequest request, StreamObserver<ReadRowsResponse> responseObserver) {
for (ReadRowsResponse r : readRowsResponses) {
responseObserver.onNext(r);
}
responseObserver.onCompleted();
}
}
}