From 577019f94c27f4cc50fa2d5a77c938a5a60dff0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 15 Jan 2020 11:43:36 +0100 Subject: [PATCH] fix: retry executeStreamingSql when error code is retryable (#795) fix: retry executeStreamingSql when error code is retryable The streaming call executeStreamingSql is not automatically retried by gax, as the gapic configuration for the call does not specify any error codes that should automatically be retried. Instead, the PartialResultStream is responsible for retrying these calls with the appropriate resume token. Until now, the call was only retried when a valid resume token had been seen for the stream, meaning that if the initial call failed with a retryable error code (e.g. UNAVAILABLE), the stream would fail with this error. This fix ensures that the call is also retried when the error occurs for the initial call or before the stream has returned a valid resume token. Fixes #620. --- src/partial-result-stream.ts | 27 +++++++-- test/partial-result-stream.ts | 108 +++++++++++++++++++++++++++++++++- 2 files changed, 128 insertions(+), 7 deletions(-) diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index 9ad72d0b7..667da1cc8 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -25,6 +25,7 @@ import * as streamEvents from 'stream-events'; import {codec, JSONOptions, Json, Field, Value} from './codec'; import {SpannerClient as s} from './v1'; +import {ServiceError, status} from 'grpc'; export type ResumeToken = string | Uint8Array; @@ -342,7 +343,10 @@ export function partialResultStream( requestFn: RequestFunction, options?: RowOptions ): PartialResultStream { + const retryableCodes = [status.UNAVAILABLE]; let lastResumeToken: ResumeToken; + let lastRetriedErr: ServiceError | undefined; + let lastRequestStream: Readable; // mergeStream allows multiple streams to be connected into one. This is good; // if we need to retry a request and pipe more data to the user's stream. @@ -355,13 +359,25 @@ export function partialResultStream( }, }); + // This listener ensures that the last request that executed successfully + // after one or more retries will end the requestsStream. + const endListener = () => { + if (lastRetriedErr) { + setImmediate(() => requestsStream.end()); + } + }; const makeRequest = (): void => { - requestsStream.add(requestFn(lastResumeToken)); + if (lastRequestStream) { + lastRequestStream.removeListener('end', endListener); + } + lastRequestStream = requestFn(lastResumeToken); + lastRequestStream.on('end', endListener); + requestsStream.add(lastRequestStream); }; - const retry = (err: Error): void => { - if (!lastResumeToken) { - // We won't retry the request, so this will flush any rows the + const retry = (err: ServiceError): void => { + if (!(err.code && retryableCodes!.includes(err.code))) { + // This is not a retryable error, so this will flush any rows the // checkpoint stream has queued. After that, we will destroy the // user's stream with the same error. setImmediate(() => batchAndSplitOnTokenStream.destroy(err)); @@ -369,6 +385,9 @@ export function partialResultStream( } // We're going to retry from where we left off. + // Keep track of the fact that we retried an error in order to end the + // merged result stream. + lastRetriedErr = err; // Empty queued rows on the checkpoint stream (will not emit them to user). batchAndSplitOnTokenStream.reset(); makeRequest(); diff --git a/test/partial-result-stream.ts b/test/partial-result-stream.ts index 9e496e9ce..4cfed6b80 100644 --- a/test/partial-result-stream.ts +++ b/test/partial-result-stream.ts @@ -24,6 +24,8 @@ import * as through from 'through2'; import {codec} from '../src/codec'; import * as prs from '../src/partial-result-stream'; +import {ServiceError, status} from 'grpc'; +import {Row} from '../src/partial-result-stream'; describe('PartialResultStream', () => { const sandbox = sinon.createSandbox(); @@ -250,7 +252,62 @@ describe('PartialResultStream', () => { ); }); - it('should resume if there was an error', done => { + it('should retry if the initial call returned a retryable error', done => { + // This test will emit two rows total: + // - UNAVAILABLE error (should retry) + // - Two rows + // - Confirm all rows were received. + const fakeCheckpointStream = through.obj(); + // tslint:disable-next-line no-any + const resetStub = ((fakeCheckpointStream as any).reset = () => {}); + sandbox.stub(checkpointStream, 'obj').returns(fakeCheckpointStream); + + const firstFakeRequestStream = through.obj(); + const secondFakeRequestStream = through.obj(); + + const requestFnStub = sandbox.stub(); + + requestFnStub.onCall(0).callsFake(() => { + setTimeout(() => { + // This causes a new request stream to be created. + firstFakeRequestStream.emit('error', { + code: status.UNAVAILABLE, + message: 'Error.', + } as ServiceError); + }, 50); + + return firstFakeRequestStream; + }); + + requestFnStub.onCall(1).callsFake(resumeToken => { + assert.ok( + !resumeToken, + 'Retry should be called with empty resume token' + ); + + setTimeout(() => { + secondFakeRequestStream.push(RESULT_WITH_TOKEN); + fakeCheckpointStream.emit('checkpoint', RESULT_WITH_TOKEN); + secondFakeRequestStream.push(RESULT_WITH_TOKEN); + fakeCheckpointStream.emit('checkpoint', RESULT_WITH_TOKEN); + + secondFakeRequestStream.end(); + }, 500); + + return secondFakeRequestStream; + }); + + partialResultStream(requestFnStub) + .on('error', done) + .pipe( + concat(rows => { + assert.strictEqual(rows.length, 2); + done(); + }) + ); + }); + + it('should resume if there was a retryable error', done => { // This test will emit four rows total: // - Two rows // - Error event (should retry) @@ -275,8 +332,10 @@ describe('PartialResultStream', () => { setTimeout(() => { // This causes a new request stream to be created. - firstFakeRequestStream.emit('error', new Error('Error.')); - firstFakeRequestStream.end(); + firstFakeRequestStream.emit('error', { + code: status.UNAVAILABLE, + message: 'Error.', + } as ServiceError); }, 50); }, 50); @@ -308,6 +367,49 @@ describe('PartialResultStream', () => { ); }); + it('should emit non-retryable error', done => { + // This test will emit two rows and then an error. + const fakeCheckpointStream = through.obj(); + // tslint:disable-next-line no-any + const resetStub = ((fakeCheckpointStream as any).reset = () => {}); + sandbox.stub(checkpointStream, 'obj').returns(fakeCheckpointStream); + + const fakeRequestStream = through.obj(); + + const requestFnStub = sandbox.stub(); + + requestFnStub.onCall(0).callsFake(() => { + setTimeout(() => { + fakeRequestStream.push(RESULT_WITH_TOKEN); + fakeCheckpointStream.emit('checkpoint', RESULT_WITH_TOKEN); + fakeRequestStream.push(RESULT_WITH_TOKEN); + fakeCheckpointStream.emit('checkpoint', RESULT_WITH_TOKEN); + + setTimeout(() => { + fakeRequestStream.emit('error', { + code: status.DATA_LOSS, + message: 'Non-retryable error.', + } as ServiceError); + }, 50); + }, 50); + + return fakeRequestStream; + }); + + const receivedRows: Row[] = []; + partialResultStream(requestFnStub) + .on('data', row => { + receivedRows.push(row); + }) + .on('error', err => { + // We should receive two rows before we get an error. + assert.strictEqual(receivedRows.length, 2); + assert.strictEqual(err.code, status.DATA_LOSS); + assert.strictEqual(requestFnStub.callCount, 1); + done(); + }); + }); + it('should emit rows and error when there is no token', done => { const expectedRow = sinon.match(EXPECTED_ROW); const error = new Error('Error.');