Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable write retry and nack pending writes on reconnect #443

Merged
merged 26 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3cd69e9
fix: nack pending writes on reconnect
alvarowolfx Apr 22, 2024
7ba612c
fix: lint issues
alvarowolfx Apr 22, 2024
70f6972
fix: lint issues
alvarowolfx Apr 22, 2024
a751d77
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 22, 2024
527ab79
Merge branch 'fix-ack-on-reconnect' of https://github.com/alvarowolfx…
gcf-owl-bot[bot] Apr 22, 2024
4cfe085
feat: enable write retries on failed pending writes
alvarowolfx Apr 23, 2024
474e237
fix: lint issue
alvarowolfx Apr 23, 2024
50f2ae7
fix: only reconnect on server-side close event
alvarowolfx Apr 24, 2024
63ac809
fix: only reconnect on close if there are pending writes
alvarowolfx Apr 24, 2024
b60db53
feat: resend on retryable error
alvarowolfx Apr 25, 2024
254d96b
fix: do not emit error if is retryable and no listerners are set up
alvarowolfx Apr 25, 2024
fb9f85f
fix: let permanent errors to nack individual pending writes
alvarowolfx Apr 25, 2024
94241d7
fix: remove unused import
alvarowolfx Apr 25, 2024
b51ea18
fix: grpc conn.write sequence is not stable
alvarowolfx Apr 26, 2024
4e7bcf3
feat: handle in stream response error and retry RESOURCE_EXAUSTED
alvarowolfx Apr 26, 2024
17c0e71
fix: lint issue
alvarowolfx Apr 26, 2024
841d174
fix: remove in stream handling and RE as retryable error
alvarowolfx Apr 26, 2024
762f5d6
fix: rollback changes to deleteDatasets method
alvarowolfx Apr 26, 2024
ac7af32
fix: reconnect only on retryable errors and handle in stream errors
alvarowolfx Apr 26, 2024
2f2e600
feat: log number of pending writes on reconnect
alvarowolfx Apr 29, 2024
f35bc48
fix: reconnect trace msg
alvarowolfx Apr 29, 2024
62ff5dd
fix: don't close conn on flush
alvarowolfx Apr 30, 2024
a921b9f
fix: rename var/properties for clarity
alvarowolfx Apr 30, 2024
1e4e851
fix: address review comments
alvarowolfx May 2, 2024
4c01180
fix: return after nack pending write due to max retries
alvarowolfx May 2, 2024
de07922
fix: change connect error msg and code
alvarowolfx May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions src/managedwriter/stream_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export class StreamConnection extends EventEmitter {
this._connection.on('error', this.handleError);
this._connection.on('close', () => {
this.trace('connection closed');
this.close();
});
this._connection.on('pause', () => {
this.trace('connection paused');
Expand All @@ -107,20 +108,19 @@ export class StreamConnection extends EventEmitter {
private handleError = (err: gax.GoogleError) => {
this.trace('on error', err, JSON.stringify(err));
if (this.shouldReconnect(err)) {
err.message = 'reconnect triggered due to: ' + err.message;
this.ackAllPendingWrites(err);
this.reconnect();
return;
}
let nextPendingWrite = this.getNextPendingWrite();
if (this.isPermanentError(err)) {
this.trace('found permanent error', err);
while (nextPendingWrite) {
this.ackNextPendingWrite(err);
nextPendingWrite = this.getNextPendingWrite();
}
this.ackAllPendingWrites(err);
this.emit('error', err);
return;
}
if (this.isRequestError(err) && nextPendingWrite) {
const nextPendingWrite = this.getNextPendingWrite();
if (nextPendingWrite) {
this.trace(
'found request error with pending write',
err,
Expand All @@ -144,6 +144,13 @@ export class StreamConnection extends EventEmitter {
return !!err.code && reconnectionErrorCodes.includes(err.code);
}

private isConnectionClosed() {
if (this._connection) {
return this._connection.destroyed || this._connection.closed;
}
return true;
}

private isPermanentError(err: gax.GoogleError): boolean {
if (err.code === gax.Status.INVALID_ARGUMENT) {
const storageErrors = parseStorageErrors(err);
Expand All @@ -160,10 +167,6 @@ export class StreamConnection extends EventEmitter {
return false;
}

private isRequestError(err: gax.GoogleError): boolean {
return err.code === gax.Status.INVALID_ARGUMENT;
}

private resolveCallOptions(
streamId: string,
options?: gax.CallOptions
Expand Down Expand Up @@ -245,6 +248,19 @@ export class StreamConnection extends EventEmitter {
return null;
}

private ackAllPendingWrites(
err: Error | null,
result?:
| protos.google.cloud.bigquery.storage.v1.IAppendRowsResponse
| undefined
) {
let nextPendingWrite = this.getNextPendingWrite();
while (nextPendingWrite) {
this.ackNextPendingWrite(err, result);
nextPendingWrite = this.getNextPendingWrite();
}
}

private ackNextPendingWrite(
err: Error | null,
result?:
Expand Down Expand Up @@ -280,16 +296,12 @@ export class StreamConnection extends EventEmitter {

private send(pw: PendingWrite) {
const request = pw.getRequest();
if (!this._connection) {
pw._markDone(new Error('connection closed'));
return;
}
if (this._connection.destroyed || this._connection.closed) {
if (this.isConnectionClosed()) {
this.reconnect();
}
this.trace('sending pending write', pw);
try {
this._connection.write(request, err => {
this._connection?.write(request, err => {
this.trace('wrote pending write', err, this._pendingWrites.length);
if (err) {
pw._markDone(err); //TODO: add retries
Expand All @@ -306,11 +318,11 @@ export class StreamConnection extends EventEmitter {
* Check if connection is open and ready to send requests.
*/
isOpen(): boolean {
return !!this._connection;
return !this.isConnectionClosed();
}

/**
* Reconnect and re send inflight requests.
* Re open appendRows BiDi gRPC connection.
*/
reconnect() {
this.trace('reconnect called');
Expand Down
94 changes: 82 additions & 12 deletions system-test/managed_writer_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

import * as assert from 'assert';
import {describe, it, xit} from 'mocha';
import {describe, it} from 'mocha';
import * as uuid from 'uuid';
import * as gax from 'google-gax';
import * as sinon from 'sinon';
Expand Down Expand Up @@ -51,6 +51,11 @@ const generateUuid = () =>
`${GCLOUD_TESTS_PREFIX}_${uuid.v4()}`.replace(/-/gi, '_');
const datasetId = generateUuid();

const sleep = (ms: number) =>
new Promise(resolve => {
setTimeout(resolve, ms);
});

const root = protobuf.Root.fromJSON(customerRecordProtoJson);
if (!root) {
throw Error('Proto must not be undefined');
Expand Down Expand Up @@ -1172,7 +1177,7 @@ describe('managedwriter.WriterClient', () => {
}
});

xit('reconnect on idle connection', async () => {
it('reconnect on idle connection', async () => {
bqWriteClient.initialize();
const client = new WriterClient();
client.setClient(bqWriteClient);
Expand Down Expand Up @@ -1207,15 +1212,17 @@ describe('managedwriter.WriterClient', () => {
let pw = writer.appendRows([row1, row2], 0);
await pw.getResult();

const sleep = (ms: number) =>
new Promise(resolve => {
setTimeout(resolve, ms);
});
const minutes = 10;
for (let i = 0; i <= minutes; i++) {
console.log('sleeping for a minute: ', minutes - i, 'to go');
await sleep(60 * 1000);
}
// Simulate server sending ABORT error as the conn was idle
const conn = connection['_connection'] as gax.CancellableStream; // private method
const gerr = new gax.GoogleError(
'Closing the stream because it has been inactive for 600 seconds'
);
gerr.code = gax.Status.ABORTED;
conn.emit('error', gerr);
// simulate server closing conn.
await sleep(100);
conn.destroy();
await sleep(100);

const row3 = {
customer_name: 'Test',
Expand All @@ -1234,7 +1241,70 @@ describe('managedwriter.WriterClient', () => {
} finally {
client.close();
}
}).timeout(20 * 60 * 1000);
}).timeout(20 * 1000);

it('should mark any pending writes with error if connection was closed', async () => {
bqWriteClient.initialize();
const client = new WriterClient();
client.setClient(bqWriteClient);

const row1 = {
customer_name: 'Ada Lovelace',
row_num: 1,
};

try {
const connection = await client.createStreamConnection({
streamType: managedwriter.PendingStream,
destinationTable: parent,
});

const writer = new JSONWriter({
connection,
protoDescriptor,
});

const pw1 = writer.appendRows([row1], 0);
await pw1.getResult();

// Try to append a new row
const row2 = {
customer_name: 'Test',
row_num: 2,
customer_email: '[email protected]',
};

let foundError: gax.GoogleError | null = null;
const pw2 = writer.appendRows([row2], 1);
pw2.getResult().catch(err => {
foundError = err as gax.GoogleError;
});

// Simulate server sending RESOURCE_EXHAUSTED error on a write
const conn = connection['_connection'] as gax.CancellableStream; // private method
// swallow ack for the last appendRow call, so we can simulate it failing
conn.removeAllListeners('data');
await new Promise(resolve => conn.once('data', resolve));
conn.addListener('data', connection['handleData']);

const gerr = new gax.GoogleError('memory limit exceeded');
gerr.code = gax.Status.RESOURCE_EXHAUSTED;
conn.emit('error', gerr);
// simulate server closing conn.
await sleep(100);
conn.destroy();
await sleep(100);

// should throw error of reconnection
assert.notEqual(foundError, null);
assert.equal(foundError!.message.includes('reconnect'), true);

connection.close();
writer.close();
} finally {
client.close();
}
});
});

describe('close', () => {
Expand Down
Loading