Skip to content

Commit

Permalink
feat: add tagging support (#1419)
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite authored Jul 8, 2021
1 parent b72bc6d commit 4770dab
Show file tree
Hide file tree
Showing 10 changed files with 474 additions and 71 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ system-test/*key.json
.DS_Store
package-lock.json
__pycache__
.idea
4 changes: 4 additions & 0 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2662,6 +2662,10 @@ class Database extends common.GrpcServiceObject {
while (true) {
try {
const [session, transaction] = await promisify(getWriteSession)();
transaction.requestOptions = Object.assign(
transaction.requestOptions || {},
options.requestOptions
);
const runner = new AsyncTransactionRunner<T>(
session,
transaction,
Expand Down
6 changes: 4 additions & 2 deletions src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
import {ServiceObjectConfig} from '@google-cloud/common';
import {NormalCallback, CLOUD_RESOURCE_HEADER} from './common';
import {grpc, CallOptions} from 'google-gax';
import IRequestOptions = google.spanner.v1.IRequestOptions;

export type GetSessionResponse = [Session, r.Response];

Expand Down Expand Up @@ -450,9 +451,10 @@ export class Session extends common.GrpcServiceObject {
* const transaction = session.transaction();
*/
transaction(
queryOptions?: google.spanner.v1.ExecuteSqlRequest.IQueryOptions
queryOptions?: google.spanner.v1.ExecuteSqlRequest.IQueryOptions,
requestOptions?: Pick<IRequestOptions, 'transactionTag'>
) {
return new Transaction(this, undefined, queryOptions);
return new Transaction(this, undefined, queryOptions, requestOptions);
}
/**
* Format the session name to include the parent database's name.
Expand Down
22 changes: 15 additions & 7 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
} from './transaction';
import {google as databaseAdmin} from '../protos/protos';
import {Schema, LongRunningCallback} from './common';
import IRequestOptions = databaseAdmin.spanner.v1.IRequestOptions;

export type Key = string | string[];

Expand All @@ -43,25 +44,30 @@ export type CreateTableCallback = LongRunningCallback<Table>;
export type DropTableResponse = UpdateSchemaResponse;
export type DropTableCallback = UpdateSchemaCallback;

interface MutateRowsOptions extends CommitOptions {
requestOptions?: Omit<IRequestOptions, 'requestTag'>;
}

export type DeleteRowsCallback = CommitCallback;
export type DeleteRowsResponse = CommitResponse;
export type DeleteRowsOptions = CommitOptions;
export type DeleteRowsOptions = MutateRowsOptions;

export type InsertRowsCallback = CommitCallback;
export type InsertRowsResponse = CommitResponse;
export type InsertRowsOptions = CommitOptions;
export type InsertRowsOptions = MutateRowsOptions;

export type ReplaceRowsCallback = CommitCallback;
export type ReplaceRowsResponse = CommitResponse;
export type ReplaceRowsOptions = CommitOptions;
export type ReplaceRowsOptions = MutateRowsOptions;

export type UpdateRowsCallback = CommitCallback;
export type UpdateRowsResponse = CommitResponse;
export type UpdateRowsOptions = CommitOptions;
export type UpdateRowsOptions = MutateRowsOptions;

export type UpsertRowsCallback = CommitCallback;
export type UpsertRowsResponse = CommitResponse;
export type UpsertRowsOptions = CommitOptions;
export type UpsertRowsOptions = MutateRowsOptions;

/**
* Create a Table object to interact with a table in a Cloud Spanner
* database.
Expand Down Expand Up @@ -1012,10 +1018,12 @@ class Table {
private _mutate(
method: 'deleteRows' | 'insert' | 'replace' | 'update' | 'upsert',
rows: object | object[],
options: CommitOptions | CallOptions,
options: MutateRowsOptions | CallOptions = {},
callback: CommitCallback
): void {
this.database.runTransaction((err, transaction) => {
const requestOptions =
'requestOptions' in options ? options.requestOptions : {};
this.database.runTransaction({requestOptions}, (err, transaction) => {
if (err) {
callback(err);
return;
Expand Down
3 changes: 3 additions & 0 deletions src/transaction-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {Transaction} from './transaction';
import {NormalCallback} from './common';
import {isSessionNotFoundError} from './session-pool';
import {Database} from './database';
import {google} from '../protos/protos';
import IRequestOptions = google.spanner.v1.IRequestOptions;

// eslint-disable-next-line @typescript-eslint/no-var-requires
const jsonProtos = require('../protos/protos.json');
Expand All @@ -41,6 +43,7 @@ const RetryInfo = Root.fromJSON(jsonProtos).lookup('google.rpc.RetryInfo');
*/
export interface RunTransactionOptions {
timeout?: number;
requestOptions?: Pick<IRequestOptions, 'transactionTag'>;
}

/**
Expand Down
79 changes: 68 additions & 11 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ export type RunResponse = [
];
export type RunUpdateResponse = [number];

export interface BatchUpdateOptions {
requestOptions?: Omit<IRequestOptions, 'transactionTag'>;
gaxOptions?: CallOptions;
}
export interface BatchUpdateCallback {
(
err: null | BatchUpdateError,
Expand Down Expand Up @@ -220,6 +224,7 @@ export class Snapshot extends EventEmitter {
session: Session;
queryOptions?: IQueryOptions;
resourceHeader_: {[k: string]: string};
requestOptions?: Pick<IRequestOptions, 'transactionTag'>;

/**
* The transaction ID.
Expand Down Expand Up @@ -344,6 +349,14 @@ export class Snapshot extends EventEmitter {
options,
};

// Only hand crafted read-write transactions will be able to set a
// transaction tag for the BeginTransaction RPC. Also, this.requestOptions
// is only set in the constructor of Transaction, which is the constructor
// for read/write transactions.
if (this.requestOptions) {
reqOpts.requestOptions = this.requestOptions;
}

this.request(
{
client: 'SpannerClient',
Expand Down Expand Up @@ -538,7 +551,8 @@ export class Snapshot extends EventEmitter {
table: string,
request = {} as ReadRequest
): PartialResultStream {
const {gaxOptions, json, jsonOptions, maxResumeRetries} = request;
const {gaxOptions, json, jsonOptions, maxResumeRetries, requestOptions} =
request;
const keySet = Snapshot.encodeKeySet(request);
const transaction: spannerClient.spanner.v1.ITransactionSelector = {};

Expand All @@ -556,13 +570,22 @@ export class Snapshot extends EventEmitter {
delete request.maxResumeRetries;
delete request.keys;
delete request.ranges;
delete request.requestOptions;

const reqOpts: ReadRequest = Object.assign(request, {
session: this.session.formattedName_!,
transaction,
table,
keySet,
});
const reqOpts: spannerClient.spanner.v1.IReadRequest = Object.assign(
request,
{
session: this.session.formattedName_!,
requestOptions: this.configureTagOptions(
typeof transaction.singleUse !== 'undefined',
this.requestOptions?.transactionTag!,
requestOptions
),
transaction,
table,
keySet,
}
);

const makeRequest = (resumeToken?: ResumeToken): Readable => {
return this.requestStream({
Expand Down Expand Up @@ -965,7 +988,8 @@ export class Snapshot extends EventEmitter {
query.queryOptions
);

const {gaxOptions, json, jsonOptions, maxResumeRetries} = query;
const {gaxOptions, json, jsonOptions, maxResumeRetries, requestOptions} =
query;
let reqOpts;

const sanitizeRequest = () => {
Expand All @@ -981,10 +1005,17 @@ export class Snapshot extends EventEmitter {
delete query.json;
delete query.jsonOptions;
delete query.maxResumeRetries;
delete query.requestOptions;
delete query.types;

reqOpts = Object.assign(query, {
session: this.session.formattedName_!,
seqno: this._seqno++,
requestOptions: this.configureTagOptions(
typeof transaction.singleUse !== 'undefined',
this.requestOptions?.transactionTag!,
requestOptions
),
transaction,
params,
paramTypes,
Expand Down Expand Up @@ -1018,6 +1049,22 @@ export class Snapshot extends EventEmitter {
});
}

/**
*
* @private
*/
configureTagOptions(
singleUse?: boolean,
transactionTag?: string,
requestOptions = {}
): IRequestOptions | null {
if (!singleUse && transactionTag) {
(requestOptions as IRequestOptions).transactionTag = transactionTag;
}

return requestOptions!;
}

/**
* Transforms convenience options `keys` and `ranges` into a KeySet object.
*
Expand Down Expand Up @@ -1151,7 +1198,7 @@ export class Snapshot extends EventEmitter {
* that a callback is omitted.
*/
promisifyAll(Snapshot, {
exclude: ['end'],
exclude: ['configureTagOptions', 'end'],
});

/**
Expand Down Expand Up @@ -1312,12 +1359,14 @@ export class Transaction extends Dml {
constructor(
session: Session,
options = {} as spannerClient.spanner.v1.TransactionOptions.ReadWrite,
queryOptions?: IQueryOptions
queryOptions?: IQueryOptions,
requestOptions?: Pick<IRequestOptions, 'transactionTag'>
) {
super(session, undefined, queryOptions);

this._queuedMutations = [];
this._options = {readWrite: options};
this.requestOptions = requestOptions;
}

batchUpdate(
Expand Down Expand Up @@ -1435,9 +1484,13 @@ export class Transaction extends Dml {

const reqOpts: spannerClient.spanner.v1.ExecuteBatchDmlRequest = {
session: this.session.formattedName_!,
requestOptions: this.configureTagOptions(
false,
this.requestOptions?.transactionTag!,
(options as BatchUpdateOptions).requestOptions
),
transaction: {id: this.id!},
seqno: this._seqno++,
requestOptions: (options as BatchUpdateOptions).requestOptions,
statements,
} as spannerClient.spanner.v1.ExecuteBatchDmlRequest;

Expand Down Expand Up @@ -1595,6 +1648,10 @@ export class Transaction extends Dml {
) {
reqOpts.returnCommitStats = (options as CommitOptions).returnCommitStats;
}
reqOpts.requestOptions = Object.assign(
requestOptions || {},
this.requestOptions
);

this.request(
{
Expand Down
33 changes: 21 additions & 12 deletions test/mockserver/mockspanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -870,24 +870,33 @@ export class MockSpanner {
}
const session = this.sessions.get(call.request!.session);
if (session) {
const buffer = Buffer.from(call.request!.transactionId as string);
const transactionId = buffer.toString();
const fullTransactionId =
session.name + '/transactions/' + transactionId;
const transaction = this.transactions.get(fullTransactionId);
if (transaction) {
this.transactions.delete(fullTransactionId);
this.transactionOptions.delete(fullTransactionId);
if (call.request!.transactionId) {
const buffer = Buffer.from(call.request!.transactionId as string);
const transactionId = buffer.toString();
const fullTransactionId =
session.name + '/transactions/' + transactionId;
const transaction = this.transactions.get(fullTransactionId);
if (transaction) {
this.transactions.delete(fullTransactionId);
this.transactionOptions.delete(fullTransactionId);
callback(
null,
protobuf.CommitResponse.create({
commitTimestamp: now(),
})
);
} else {
callback(
MockSpanner.createTransactionNotFoundError(fullTransactionId)
);
}
} else if (call.request!.singleUseTransaction) {
callback(
null,
protobuf.CommitResponse.create({
commitTimestamp: now(),
})
);
} else {
callback(
MockSpanner.createTransactionNotFoundError(fullTransactionId)
);
}
} else {
callback(
Expand Down
Loading

0 comments on commit 4770dab

Please sign in to comment.