Skip to content

Commit

Permalink
fix: retry executeStreamingSql when error code is retryable (googleap…
Browse files Browse the repository at this point in the history
…is#795)

fix: retry executeStreamingSql when error code is retryable

The streaming call executeStreamingSql is not automatically retried by gax, as the gapic configuration for the call does not specify any error codes that should automatically be retried. Instead, the PartialResultStream is responsible for retrying these calls with the appropriate resume token. Until now, the call was only retried when a valid resume token had been seen for the stream, meaning that if the initial call failed with a retryable error code (e.g. UNAVAILABLE), the stream would fail with this error. This fix ensures that the call is also retried when the error occurs for the initial call or before the stream has returned a valid resume token.

Fixes googleapis#620.
  • Loading branch information
olavloite authored and AVaksman committed Jan 16, 2020
1 parent 5aabbd5 commit 577019f
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 7 deletions.
27 changes: 23 additions & 4 deletions src/partial-result-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import * as streamEvents from 'stream-events';

import {codec, JSONOptions, Json, Field, Value} from './codec';
import {SpannerClient as s} from './v1';
import {ServiceError, status} from 'grpc';

export type ResumeToken = string | Uint8Array;

Expand Down Expand Up @@ -342,7 +343,10 @@ export function partialResultStream(
requestFn: RequestFunction,
options?: RowOptions
): PartialResultStream {
const retryableCodes = [status.UNAVAILABLE];
let lastResumeToken: ResumeToken;
let lastRetriedErr: ServiceError | undefined;
let lastRequestStream: Readable;

// mergeStream allows multiple streams to be connected into one. This is good;
// if we need to retry a request and pipe more data to the user's stream.
Expand All @@ -355,20 +359,35 @@ export function partialResultStream(
},
});

// This listener ensures that the last request that executed successfully
// after one or more retries will end the requestsStream.
const endListener = () => {
if (lastRetriedErr) {
setImmediate(() => requestsStream.end());
}
};
const makeRequest = (): void => {
requestsStream.add(requestFn(lastResumeToken));
if (lastRequestStream) {
lastRequestStream.removeListener('end', endListener);
}
lastRequestStream = requestFn(lastResumeToken);
lastRequestStream.on('end', endListener);
requestsStream.add(lastRequestStream);
};

const retry = (err: Error): void => {
if (!lastResumeToken) {
// We won't retry the request, so this will flush any rows the
const retry = (err: ServiceError): void => {
if (!(err.code && retryableCodes!.includes(err.code))) {
// This is not a retryable error, so this will flush any rows the
// checkpoint stream has queued. After that, we will destroy the
// user's stream with the same error.
setImmediate(() => batchAndSplitOnTokenStream.destroy(err));
return;
}

// We're going to retry from where we left off.
// Keep track of the fact that we retried an error in order to end the
// merged result stream.
lastRetriedErr = err;
// Empty queued rows on the checkpoint stream (will not emit them to user).
batchAndSplitOnTokenStream.reset();
makeRequest();
Expand Down
108 changes: 105 additions & 3 deletions test/partial-result-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import * as through from 'through2';

import {codec} from '../src/codec';
import * as prs from '../src/partial-result-stream';
import {ServiceError, status} from 'grpc';
import {Row} from '../src/partial-result-stream';

describe('PartialResultStream', () => {
const sandbox = sinon.createSandbox();
Expand Down Expand Up @@ -250,7 +252,62 @@ describe('PartialResultStream', () => {
);
});

it('should resume if there was an error', done => {
it('should retry if the initial call returned a retryable error', done => {
// This test will emit two rows total:
// - UNAVAILABLE error (should retry)
// - Two rows
// - Confirm all rows were received.
const fakeCheckpointStream = through.obj();
// tslint:disable-next-line no-any
const resetStub = ((fakeCheckpointStream as any).reset = () => {});
sandbox.stub(checkpointStream, 'obj').returns(fakeCheckpointStream);

const firstFakeRequestStream = through.obj();
const secondFakeRequestStream = through.obj();

const requestFnStub = sandbox.stub();

requestFnStub.onCall(0).callsFake(() => {
setTimeout(() => {
// This causes a new request stream to be created.
firstFakeRequestStream.emit('error', {
code: status.UNAVAILABLE,
message: 'Error.',
} as ServiceError);
}, 50);

return firstFakeRequestStream;
});

requestFnStub.onCall(1).callsFake(resumeToken => {
assert.ok(
!resumeToken,
'Retry should be called with empty resume token'
);

setTimeout(() => {
secondFakeRequestStream.push(RESULT_WITH_TOKEN);
fakeCheckpointStream.emit('checkpoint', RESULT_WITH_TOKEN);
secondFakeRequestStream.push(RESULT_WITH_TOKEN);
fakeCheckpointStream.emit('checkpoint', RESULT_WITH_TOKEN);

secondFakeRequestStream.end();
}, 500);

return secondFakeRequestStream;
});

partialResultStream(requestFnStub)
.on('error', done)
.pipe(
concat(rows => {
assert.strictEqual(rows.length, 2);
done();
})
);
});

it('should resume if there was a retryable error', done => {
// This test will emit four rows total:
// - Two rows
// - Error event (should retry)
Expand All @@ -275,8 +332,10 @@ describe('PartialResultStream', () => {

setTimeout(() => {
// This causes a new request stream to be created.
firstFakeRequestStream.emit('error', new Error('Error.'));
firstFakeRequestStream.end();
firstFakeRequestStream.emit('error', {
code: status.UNAVAILABLE,
message: 'Error.',
} as ServiceError);
}, 50);
}, 50);

Expand Down Expand Up @@ -308,6 +367,49 @@ describe('PartialResultStream', () => {
);
});

it('should emit non-retryable error', done => {
// This test will emit two rows and then an error.
const fakeCheckpointStream = through.obj();
// tslint:disable-next-line no-any
const resetStub = ((fakeCheckpointStream as any).reset = () => {});
sandbox.stub(checkpointStream, 'obj').returns(fakeCheckpointStream);

const fakeRequestStream = through.obj();

const requestFnStub = sandbox.stub();

requestFnStub.onCall(0).callsFake(() => {
setTimeout(() => {
fakeRequestStream.push(RESULT_WITH_TOKEN);
fakeCheckpointStream.emit('checkpoint', RESULT_WITH_TOKEN);
fakeRequestStream.push(RESULT_WITH_TOKEN);
fakeCheckpointStream.emit('checkpoint', RESULT_WITH_TOKEN);

setTimeout(() => {
fakeRequestStream.emit('error', {
code: status.DATA_LOSS,
message: 'Non-retryable error.',
} as ServiceError);
}, 50);
}, 50);

return fakeRequestStream;
});

const receivedRows: Row[] = [];
partialResultStream(requestFnStub)
.on('data', row => {
receivedRows.push(row);
})
.on('error', err => {
// We should receive two rows before we get an error.
assert.strictEqual(receivedRows.length, 2);
assert.strictEqual(err.code, status.DATA_LOSS);
assert.strictEqual(requestFnStub.callCount, 1);
done();
});
});

it('should emit rows and error when there is no token', done => {
const expectedRow = sinon.match(EXPECTED_ROW);
const error = new Error('Error.');
Expand Down

0 comments on commit 577019f

Please sign in to comment.