Skip to content

Commit

Permalink
feat: enable write retry and nack pending writes on reconnect (#443)
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarowolfx authored May 3, 2024
1 parent f8849e8 commit ce4f88c
Show file tree
Hide file tree
Showing 5 changed files with 536 additions and 118 deletions.
23 changes: 23 additions & 0 deletions src/managedwriter/pending_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,41 @@ type AppendRowRequest =
export class PendingWrite {
private request: AppendRowRequest;
private response?: AppendRowsResponse;
private attempts: number;
private promise: Promise<AppendRowsResponse>;
private resolveFunc?: (response: AppendRowsResponse) => void;
private rejectFunc?: (reason?: protos.google.rpc.IStatus) => void;

constructor(request: AppendRowRequest) {
this.request = request;
this.attempts = 0;
this.promise = new Promise((resolve, reject) => {
this.resolveFunc = resolve;
this.rejectFunc = reject;
});
}

/**
* Increase number of attempts and return current value.
*
* @private
* @internal
* @returns {number} current number of attempts
*/
_increaseAttempts(): number {
return this.attempts++;
}

/**
* Resolve pending write with error or AppendRowResponse.
* This resolves the promise accessed via GetResult()
*
* @see GetResult
*
* @private
* @internal
* @returns {number} current number of attempts
*/
_markDone(err: Error | null, response?: AppendRowsResponse) {
if (err) {
this.rejectFunc && this.rejectFunc(err);
Expand Down
157 changes: 102 additions & 55 deletions src/managedwriter/stream_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import * as protos from '../../protos/protos';
import {WriterClient} from './writer_client';
import {PendingWrite} from './pending_write';
import {logger} from './logger';
import {parseStorageErrors} from './error';

type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema;
type IInt64Value = protos.google.protobuf.IInt64Value;
Expand Down Expand Up @@ -56,6 +55,7 @@ export class StreamConnection extends EventEmitter {
private _streamId: string;
private _writeClient: WriterClient;
private _connection?: gax.CancellableStream | null;
private _lastConnectionError?: gax.GoogleError | null;
private _callOptions?: gax.CallOptions;
private _pendingWrites: PendingWrite[];

Expand All @@ -76,6 +76,7 @@ export class StreamConnection extends EventEmitter {
if (this.isOpen()) {
this.close();
}
this._lastConnectionError = null;
const callOptions = this.resolveCallOptions(
this._streamId,
this._callOptions
Expand All @@ -86,7 +87,23 @@ export class StreamConnection extends EventEmitter {
this._connection.on('data', this.handleData);
this._connection.on('error', this.handleError);
this._connection.on('close', () => {
this.trace('connection closed');
this.trace('connection closed', this._lastConnectionError);
if (this.hasPendingWrites()) {
const retrySettings = this._writeClient._retrySettings;
if (
retrySettings.enableWriteRetries &&
this.isRetryableError(this._lastConnectionError)
) {
this.reconnect();
this.resendAllPendingWrites();
} else {
const err = new gax.GoogleError(
'Connection failure, please retry the request'
);
err.code = gax.Status.UNAVAILABLE;
this.ackAllPendingWrites(err);
}
}
});
this._connection.on('pause', () => {
this.trace('connection paused');
Expand All @@ -106,62 +123,53 @@ export class StreamConnection extends EventEmitter {

private handleError = (err: gax.GoogleError) => {
this.trace('on error', err, JSON.stringify(err));
if (this.shouldReconnect(err)) {
this.reconnect();
return;
}
let nextPendingWrite = this.getNextPendingWrite();
if (this.isPermanentError(err)) {
this.trace('found permanent error', err);
while (nextPendingWrite) {
this.ackNextPendingWrite(err);
nextPendingWrite = this.getNextPendingWrite();
}
this.emit('error', err);
return;
}
if (this.isRequestError(err) && nextPendingWrite) {
this._lastConnectionError = err;
const nextPendingWrite = this.getNextPendingWrite();
if (nextPendingWrite) {
this.trace(
'found request error with pending write',
err,
nextPendingWrite
);
this.ackNextPendingWrite(err);
this.handleRetry(err);
}
if (this.listenerCount('error') === 0 && this.isRetryableError(err)) {
return;
}
this.emit('error', err);
};

private shouldReconnect(err: gax.GoogleError): boolean {
const reconnectionErrorCodes = [
gax.Status.UNAVAILABLE,
gax.Status.RESOURCE_EXHAUSTED,
private handleRetry(err: gax.GoogleError) {
const retrySettings = this._writeClient._retrySettings;
if (retrySettings.enableWriteRetries && this.isRetryableError(err)) {
if (!this.isConnectionClosed()) {
const pw = this._pendingWrites.pop()!;
this.send(pw);
}
} else {
this.ackNextPendingWrite(err);
}
}

private isRetryableError(err?: gax.GoogleError | null): boolean {
if (!err) {
return false;
}
const errorCodes = [
gax.Status.ABORTED,
gax.Status.UNAVAILABLE,
gax.Status.CANCELLED,
gax.Status.DEADLINE_EXCEEDED,
gax.Status.INTERNAL,
gax.Status.DEADLINE_EXCEEDED,
];
return !!err.code && reconnectionErrorCodes.includes(err.code);
return !!err.code && errorCodes.includes(err.code);
}

private isPermanentError(err: gax.GoogleError): boolean {
if (err.code === gax.Status.INVALID_ARGUMENT) {
const storageErrors = parseStorageErrors(err);
for (const storageError of storageErrors) {
if (
storageError.errorMessage?.includes(
'Schema mismatch due to extra fields in user schema'
)
) {
return true;
}
}
private isConnectionClosed() {
if (this._connection) {
return this._connection.destroyed || this._connection.closed;
}
return false;
}

private isRequestError(err: gax.GoogleError): boolean {
return err.code === gax.Status.INVALID_ARGUMENT;
return true;
}

private resolveCallOptions(
Expand All @@ -183,15 +191,23 @@ export class StreamConnection extends EventEmitter {
}

private handleData = (response: AppendRowsResponse) => {
this.trace('data arrived', response);
const pw = this.getNextPendingWrite();
if (!pw) {
this.trace('data arrived', response, this._pendingWrites.length);
if (!this.hasPendingWrites()) {
this.trace('data arrived with no pending write available', response);
return;
}
if (response.updatedSchema) {
this.emit('schemaUpdated', response.updatedSchema);
}
const responseErr = response.error;
if (responseErr) {
const gerr = new gax.GoogleError(responseErr.message!);
gerr.code = responseErr.code!;
if (this.isRetryableError(gerr)) {
this.handleRetry(gerr);
return;
}
}
this.ackNextPendingWrite(null, response);
};

Expand Down Expand Up @@ -238,13 +254,38 @@ export class StreamConnection extends EventEmitter {
return this._streamId;
};

private hasPendingWrites(): boolean {
return this._pendingWrites.length > 0;
}

private getNextPendingWrite(): PendingWrite | null {
if (this._pendingWrites.length > 0) {
return this._pendingWrites[0];
return this._pendingWrites[this._pendingWrites.length - 1];
}
return null;
}

private resendAllPendingWrites() {
const pendingWritesToRetry = [...this._pendingWrites]; // copy array;
let pw = pendingWritesToRetry.pop();
while (pw) {
this._pendingWrites.pop(); // remove from real queue
this.send(pw); // .send immediately adds to the queue
pw = pendingWritesToRetry.pop();
}
}

private ackAllPendingWrites(
err: Error | null,
result?:
| protos.google.cloud.bigquery.storage.v1.IAppendRowsResponse
| undefined
) {
while (this.hasPendingWrites()) {
this.ackNextPendingWrite(err, result);
}
}

private ackNextPendingWrite(
err: Error | null,
result?:
Expand All @@ -253,6 +294,7 @@ export class StreamConnection extends EventEmitter {
) {
const pw = this._pendingWrites.pop();
if (pw) {
this.trace('ack pending write:', pw, err, result);
pw._markDone(err, result);
}
}
Expand All @@ -279,23 +321,27 @@ export class StreamConnection extends EventEmitter {
}

private send(pw: PendingWrite) {
const request = pw.getRequest();
if (!this._connection) {
pw._markDone(new Error('connection closed'));
const retrySettings = this._writeClient._retrySettings;
const tries = pw._increaseAttempts();
if (tries > retrySettings.maxRetryAttempts) {
pw._markDone(
new Error(`pending write max retries reached: ${tries} attempts`)
);
return;
}
if (this._connection.destroyed || this._connection.closed) {
if (this.isConnectionClosed()) {
this.reconnect();
}
this.trace('sending pending write', pw);
try {
this._connection.write(request, err => {
const request = pw.getRequest();
this._pendingWrites.unshift(pw);
this._connection?.write(request, err => {
this.trace('wrote pending write', err, this._pendingWrites.length);
if (err) {
pw._markDone(err); //TODO: add retries
return;
}
this._pendingWrites.unshift(pw);
});
} catch (err) {
pw._markDone(err as Error);
Expand All @@ -306,14 +352,16 @@ export class StreamConnection extends EventEmitter {
* Check if connection is open and ready to send requests.
*/
isOpen(): boolean {
return !!this._connection;
return !this.isConnectionClosed();
}

/**
* Reconnect and re send inflight requests.
* Re open appendRows BiDi gRPC connection.
*/
reconnect() {
this.trace('reconnect called');
this.trace(
`reconnect called with ${this._pendingWrites.length} pending writes`
);
this.close();
this.open();
}
Expand Down Expand Up @@ -347,7 +395,6 @@ export class StreamConnection extends EventEmitter {
async flushRows(request?: {
offset?: IInt64Value['value'];
}): Promise<FlushRowsResponse | null> {
this.close();
if (this.isDefaultStream()) {
return null;
}
Expand Down
2 changes: 1 addition & 1 deletion src/managedwriter/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ export class Writer {
offsetValue?: IInt64Value['value']
): PendingWrite {
let offset: AppendRowRequest['offset'];
if (offsetValue) {
if (offsetValue !== undefined && offsetValue !== null) {
offset = {
value: offsetValue,
};
Expand Down
37 changes: 37 additions & 0 deletions src/managedwriter/writer_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import {StreamConnection} from './stream_connection';
type StreamConnections = {
connectionList: StreamConnection[];
};
type RetrySettings = {
enableWriteRetries: boolean;
maxRetryAttempts: number;
};
type CreateWriteStreamRequest =
protos.google.cloud.bigquery.storage.v1.ICreateWriteStreamRequest;
type BatchCommitWriteStreamsRequest =
Expand Down Expand Up @@ -55,6 +59,12 @@ export class WriterClient {
private _client: BigQueryWriteClient;
private _connections: StreamConnections;
private _open: boolean;
/**
* Retry settings, only internal for now.
* @private
* @internal
*/
_retrySettings: RetrySettings;

constructor(opts?: ClientOptions) {
const baseOptions = {
Expand All @@ -69,6 +79,10 @@ export class WriterClient {
connectionList: [],
};
this._open = false;
this._retrySettings = {
enableWriteRetries: false,
maxRetryAttempts: 4,
};
}

/**
Expand Down Expand Up @@ -102,6 +116,29 @@ export class WriterClient {
return this._open;
}

/**
* Enables StreamConnections to automatically retry failed appends.
*
* Enabling retries is best suited for cases where users want to achieve at-least-once
* append semantics. Use of automatic retries may complicate patterns where the user
* is designing for exactly-once append semantics.
*/
enableWriteRetries(enable: boolean) {
this._retrySettings.enableWriteRetries = enable;
}

/**
* Change max retries attempts on child StreamConnections.
*
* The default valuen is to retry 4 times.
*
* Only valid right now when write retries are enabled.
* @see enableWriteRetries.
*/
setMaxRetryAttempts(retryAttempts: number) {
this._retrySettings.maxRetryAttempts = retryAttempts;
}

/**
* Creates a write stream to the given table.
* Additionally, every table has a special stream named DefaultStream
Expand Down
Loading

0 comments on commit ce4f88c

Please sign in to comment.