Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introducing Tracing with OpenTelemetry API #1537

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
d84aae4
feat: Adding TraceUtil interface and its implementation to enable Tra…
jimit-j-shah May 8, 2024
77810da
feat: Adding Lookup RPC OpenTelemetry Tracing (#1437)
jimit-j-shah May 15, 2024
730aa42
feat: Adding Commit RPC Trace Instrumentation (#1440)
jimit-j-shah May 20, 2024
602c5e3
feat: RunQuery trace instrumentation (#1441)
jimit-j-shah May 23, 2024
1e4e7ca
feat: RunAggregationQuery instrumentation (#1447)
jimit-j-shah May 23, 2024
52329e4
feat: RunQuery trace instrumentation
jimit-j-shah May 15, 2024
de05e49
Formatting
jimit-j-shah May 15, 2024
a9965db
Formatting
jimit-j-shah May 15, 2024
3fef66f
Refactor: s/RUNQUERY/RUN_QUERY
jimit-j-shah May 16, 2024
fb8a28e
feat: RunAggregationQuery Trace Instrumentation
jimit-j-shah May 16, 2024
02d178d
Build: retiring test assertions for OpenCensus spans - will be replac…
jimit-j-shah May 16, 2024
a2a2f03
Formatting
jimit-j-shah May 17, 2024
73241a2
Fixing @Test annotation missed after merge
jimit-j-shah May 20, 2024
75a98b4
Formatting
jimit-j-shah May 20, 2024
585181f
feat: Add Transaction tracing
jimit-j-shah May 21, 2024
4edc2ef
test: Transaction test for RunInTransaction - need to fix trace instr…
jimit-j-shah May 21, 2024
22571ea
Adding transaction span names
jimit-j-shah May 23, 2024
3076735
TransactionLookupTest
jimit-j-shah May 23, 2024
8dca867
feat: support for transactional operations
jimit-j-shah May 30, 2024
66cebbb
Revert "feat: support for transactional operations"
jimit-j-shah May 30, 2024
24779b9
feat: support for transactional operations (#1468)
jimit-j-shah Jun 7, 2024
552d04e
feat: Allocateid tracing (#1488)
jimit-j-shah Jun 19, 2024
975464a
feat: Add tracing for ReserveIds operation (#1490)
jimit-j-shah Jun 25, 2024
27caf81
fix: Fixed Span nesting for `ReadWriteTransactionCallable` by using p…
jimit-j-shah Jun 27, 2024
31a6f4a
test: Additional Transaction Testing and cleanup OpenCensus usage (#1…
jimit-j-shah Jul 2, 2024
536c2d9
test: Adding ITTracingTest to verify events and span attributes (whic…
jimit-j-shah Jul 10, 2024
6944e1b
test: Additional Transaction tests and AggregationQuery test (#1518)
jimit-j-shah Jul 19, 2024
5652dd4
fix: Undelete gRPC upgrade docs
jimit-j-shah Aug 5, 2024
aa00fe2
fix: Undo merge mistakes
jimit-j-shah Aug 5, 2024
3a3d7b8
fix: Updating span event strings (#1539)
jimit-j-shah Aug 6, 2024
3640051
Fix: typo in test causing integration test failure (#1556)
jimit-j-shah Sep 4, 2024
f787871
chore(main): release 2.21.0 (#1517)
release-please[bot] Aug 5, 2024
a880548
deps: update dependency com.google.cloud:sdk-platform-java-config to …
renovate-bot Aug 6, 2024
fb43b23
chore(main): release 2.21.1-SNAPSHOT (#1538)
release-please[bot] Aug 6, 2024
2fdc90a
chore(deps): update dependency com.google.cloud:libraries-bom to v26.…
renovate-bot Aug 6, 2024
1706dd5
chore(main): release 2.21.1 (#1540)
release-please[bot] Aug 6, 2024
0b496ba
chore(main): release 2.21.2-SNAPSHOT (#1541)
release-please[bot] Aug 20, 2024
16b3515
chore: secure hermetic_library_generation workflow (#1552)
diegomarquezp Aug 20, 2024
ea98972
deps: update dependency com.google.cloud:sdk-platform-java-config to …
renovate-bot Aug 22, 2024
fb4bd08
chore(main): release 2.21.2 (#1553)
release-please[bot] Aug 23, 2024
3be4fb7
chore: remove datastore native image sample in favor of sample hosted…
mpeddada1 Aug 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: support for transactional operations
- tested using newTransaction() and runInTransaction()
  • Loading branch information
jimit-j-shah committed Aug 5, 2024
commit 8dca8671a79043eb6b322e2d9c509be90071c607
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.datastore.execution.AggregationQueryExecutor;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.telemetry.TraceUtil.Context;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;

final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datastore {

Expand Down Expand Up @@ -103,13 +105,18 @@ static class ReadWriteTransactionCallable<T> implements Callable<T> {
private final TransactionCallable<T> callable;
private volatile TransactionOptions options;
private volatile Transaction transaction;
@Nonnull private final Context transactionTraceContext;

ReadWriteTransactionCallable(
Datastore datastore, TransactionCallable<T> callable, TransactionOptions options) {
Datastore datastore,
TransactionCallable<T> callable,
TransactionOptions options,
@Nonnull Context parentTraceContext) {
this.datastore = datastore;
this.callable = callable;
this.options = options;
this.transaction = null;
this.transactionTraceContext = parentTraceContext;
}

Datastore getDatastore() {
Expand All @@ -132,8 +139,9 @@ void setPrevTransactionId(ByteString transactionId) {

@Override
public T call() throws DatastoreException {
transaction = datastore.newTransaction(options);
try {
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored =
transactionTraceContext.makeCurrent()) {
transaction = datastore.newTransaction(options);
T value = callable.run(transaction);
transaction.commit();
return value;
Expand All @@ -154,36 +162,41 @@ public T call() throws DatastoreException {

@Override
public <T> T runInTransaction(final TransactionCallable<T> callable) {
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(this, callable, null),
new ReadWriteTransactionCallable<T>(this, callable, null, otelTraceUtil.currentContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.setStatus(Status.UNKNOWN.withDescription(e.getMessage()));
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
}
}

@Override
public <T> T runInTransaction(
final TransactionCallable<T> callable, TransactionOptions transactionOptions) {
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(this, callable, transactionOptions),
new ReadWriteTransactionCallable<T>(
this, callable, transactionOptions, otelTraceUtil.currentContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.setStatus(Status.UNKNOWN.withDescription(e.getMessage()));
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
}
}

Expand Down Expand Up @@ -634,10 +647,14 @@ private com.google.datastore.v1.CommitResponse commitMutation(

com.google.datastore.v1.CommitResponse commit(
final com.google.datastore.v1.CommitRequest requestPb) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT);
final boolean isTransactional =
requestPb.hasTransaction() || requestPb.hasSingleUseTransaction();
final String spanName =
isTransactional
? com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT
: com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT;
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName);
span.setAttribute("isTransactional", requestPb.hasTransaction());

try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
() -> datastoreRpc.commit(requestPb),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.api.core.BetaApi;
import com.google.cloud.datastore.models.ExplainOptions;
import com.google.cloud.datastore.telemetry.TraceUtil.Context;
import com.google.cloud.datastore.telemetry.TraceUtil.Scope;
import com.google.common.collect.ImmutableList;
import com.google.datastore.v1.ReadOptions;
import com.google.datastore.v1.TransactionOptions;
Expand All @@ -28,6 +30,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;

final class TransactionImpl extends BaseDatastoreBatchWriter implements Transaction {

Expand All @@ -37,6 +40,8 @@ final class TransactionImpl extends BaseDatastoreBatchWriter implements Transact

private final ReadOptionProtoPreparer readOptionProtoPreparer;

@Nonnull private final Context transactionTraceContext;

static class ResponseImpl implements Transaction.Response {

private final com.google.datastore.v1.CommitResponse response;
Expand Down Expand Up @@ -78,6 +83,7 @@ public List<Key> getGeneratedKeys() {

transactionId = datastore.requestTransactionId(requestPb);
this.readOptionProtoPreparer = new ReadOptionProtoPreparer();
this.transactionTraceContext = datastore.getOptions().getTraceUtil().currentContext();
}

@Override
Expand All @@ -96,7 +102,9 @@ public Iterator<Entity> get(Key... keys) {
@Override
public List<Entity> fetch(Key... keys) {
validateActive();
return DatastoreHelper.fetch(this, keys);
try (Scope ignored = transactionTraceContext.makeCurrent()) {
return DatastoreHelper.fetch(this, keys);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.InternalExtensionOnly;
import com.google.cloud.datastore.DatastoreOptions;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.context.Context;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -35,6 +36,7 @@ public interface TraceUtil {
static final String SPAN_NAME_COMMIT = "Commit";
static final String SPAN_NAME_RUN_QUERY = "RunQuery";
static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery";
static final String SPAN_NAME_TRANSACTION_RUN = "Transaction.run";
static final String SPAN_NAME_BEGIN_TRANSACTION = "Transaction.Begin";
static final String SPAN_NAME_TRANSACTION_LOOKUP = "Transaction.Lookup";
static final String SPAN_NAME_TRANSACTION_COMMIT = "Transaction.Commit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_LOOKUP;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_LOOKUP;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN;
import static com.google.common.truth.Truth.assertThat;
import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -749,6 +751,7 @@ public void transactionalLookupTest() throws Exception {
try (Scope ignored = rootSpan.makeCurrent()) {
Transaction transaction = datastore.newTransaction();
Entity entity = datastore.get(KEY1, ReadOption.transactionId(transaction.getTransactionId()));
transaction.commit();
assertNull(entity);
} finally {
rootSpan.end();
Expand All @@ -757,18 +760,55 @@ public void transactionalLookupTest() throws Exception {

fetchAndValidateTrace(
customSpanContext.getTraceId(),
/*numExpectedSpans=*/ 2,
Collections.singletonList(Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION)));
/*numExpectedSpans=*/ 3,
Arrays.asList(
Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION),
Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP),
Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT)));
}

@Test
public void runInTransactionQueryTest() throws Exception {
Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build();
Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build();
List<Entity> entityList = new ArrayList<>();
entityList.add(entity1);
entityList.add(entity2);

List<Entity> response = datastore.add(entity1, entity2);
assertEquals(entityList, response);

assertNotNull(customSpanContext);

Span rootSpan = getNewRootSpanWithContext();
try (Scope ignored = rootSpan.makeCurrent()) {
PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field"));
Query<Entity> query =
Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build();
Datastore.TransactionCallable<Boolean> callable =
transaction -> {
QueryResults<Entity> queryResults = datastore.run(query);
assertTrue(queryResults.hasNext());
assertEquals(entity1, queryResults.next());
assertFalse(queryResults.hasNext());
return true;
};
datastore.runInTransaction(callable);
} finally {
rootSpan.end();
}
waitForTracesToComplete();

fetchAndValidateTrace(
customSpanContext.getTraceId(),
/*numExpectedSpans=*/ 2,
Collections.singletonList(Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP)));
/*numExpectedSpans=*/ 4,
Arrays.asList(
Collections.singletonList(SPAN_NAME_TRANSACTION_RUN),
Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION),
Collections.singletonList(SPAN_NAME_RUN_QUERY),
Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT)));
}

@Test
public void runInTransactionQueryTest() throws Exception {}

@Test
public void runInTransactionAggregationQueryTest() throws Exception {}

Expand Down