Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: rewrite streaming logic #136

Merged
merged 2 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@
},
"dependencies": {
"arrify": "^2.0.0",
"extend": "^3.0.1",
"split-array-stream": "^2.0.0",
"stream-events": "^1.0.4"
stephenplusplus marked this conversation as resolved.
Show resolved Hide resolved
"extend": "^3.0.1"
},
"engines": {
"node": ">=8.10.0"
Expand Down
136 changes: 14 additions & 122 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,83 +20,8 @@

import arrify = require('arrify');
import * as extend from 'extend';
import {split} from 'split-array-stream';
import {Transform, TransformOptions} from 'stream';
import * as streamEvents from 'stream-events';

export interface CreateLimiterOptions {
/**
* The maximum number of API calls to make.
*/
maxApiCalls?: number;

/**
* Options to pass to the Stream constructor.
*/
streamOptions?: TransformOptions;
}

export interface Limiter {
// tslint:disable-next-line no-any
makeRequest(...args: any[]): Transform | undefined;
stream: Transform;
}

export type ResourceStream<T> = {
addListener(event: 'data', listener: (data: T) => void): ResourceStream<T>;
emit(event: 'data', data: T): boolean;
on(event: 'data', listener: (data: T) => void): ResourceStream<T>;
once(event: 'data', listener: (data: T) => void): ResourceStream<T>;
prependListener(
event: 'data',
listener: (data: T) => void
): ResourceStream<T>;
prependOnceListener(
event: 'data',
listener: (data: T) => void
): ResourceStream<T>;
removeListener(event: 'data', listener: (data: T) => void): ResourceStream<T>;
} & Transform;

/**
* Limit requests according to a `maxApiCalls` limit.
*
* @param {function} makeRequestFn - The function that will be called.
* @param {object=} options - Configuration object.
* @param {number} options.maxApiCalls - The maximum number of API calls to make.
* @param {object} options.streamOptions - Options to pass to the Stream constructor.
*/
export function createLimiter(
makeRequestFn: Function,
options?: CreateLimiterOptions
): Limiter {
options = options || {};

const streamOptions = options.streamOptions || {};
streamOptions.objectMode = true;
const stream = streamEvents(new Transform(streamOptions)) as Transform;

let requestsMade = 0;
let requestsToMake = -1;

if (typeof options.maxApiCalls === 'number') {
requestsToMake = options.maxApiCalls!;
}

return {
// tslint:disable-next-line:no-any
makeRequest(...args: any[]) {
requestsMade++;
if (requestsToMake >= 0 && requestsMade > requestsToMake) {
stream.push(null);
return;
}
makeRequestFn.apply(null, args);
return stream;
},
stream,
};
}
import {TransformOptions} from 'stream';
import {ResourceStream} from './resource-stream';

export interface ParsedArguments extends TransformOptions {
/**
Expand Down Expand Up @@ -196,7 +121,10 @@ export class Paginator {
): ResourceStream<T> {
const parsedArguments = paginator.parseArguments_(args);
const originalMethod = this[methodName + '_'] || this[methodName];
return paginator.runAsStream_(parsedArguments, originalMethod.bind(this));
return paginator.runAsStream_<T>(
parsedArguments,
originalMethod.bind(this)
);
};
}

Expand Down Expand Up @@ -327,52 +255,16 @@ export class Paginator {
* and returns `nextQuery` to receive more results.
* @return {stream} - Readable object stream.
*/
runAsStream_(parsedArguments: ParsedArguments, originalMethod: Function) {
const query = parsedArguments.query;
let resultsToSend = parsedArguments.maxResults!;

const limiter = exports.createLimiter(makeRequest, {
maxApiCalls: parsedArguments.maxApiCalls,
streamOptions: parsedArguments.streamOptions,
});

const stream = limiter.stream;

stream.once('reading', () => {
limiter.makeRequest(query);
});

function makeRequest(query?: ParsedArguments | string) {
originalMethod(query, onResultSet);
}

// tslint:disable-next-line:no-any
function onResultSet(err: Error | null, results?: any[], nextQuery?: any) {
if (err) {
stream.destroy(err);
return;
}

if (resultsToSend >= 0 && results!.length > resultsToSend) {
results = results!.splice(0, resultsToSend);
}

resultsToSend -= results!.length;

split(results!, stream).then(streamEnded => {
if (streamEnded) {
return;
}
if (nextQuery && resultsToSend !== 0) {
limiter.makeRequest(nextQuery);
return;
}
stream.push(null);
});
}
return limiter.stream;
// tslint:disable-next-line:no-any
runAsStream_<T = any>(
parsedArguments: ParsedArguments,
originalMethod: Function
): ResourceStream<T> {
return new ResourceStream<T>(parsedArguments, originalMethod);
Copy link
Contributor

@stephenplusplus stephenplusplus Jul 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a rough draft attempt at an option for how this could look if split-array-stream changes go through:

runAsStream_ (parsedArguments, originalMethod) {
  let numMaxApiCalls = parsedArguments.maxApiCalls === -1 ? Infinity : parsedArguments.maxApiCalls;
  let numRequestsMade;
  let numResultsToSend = parsedArguments.maxResults === -1 ? Infinity  : parsedArguments.maxResults;

  let query = parsedArguments.query;

  const splitStream = new SplitArrayStream(getPage);

  function getPage() {
    return new Promise((resolve, reject) => {
      originalMethod(query, (err, results, nextQuery) => {
        if (err) {
          splitStream.destroy(err);
          return;
        }

        if (numResultsToSend !== Infinity) {
          results = results.splice(0, numResultsToSend);
          numResultsToSend -= results.length;
        }

        const didMakeMaxCalls = ++numRequestsMade >= numMaxApiCalls;

        if (didMakeMaxCalls) {
          results.push(null);
        } else {
          query = nextQuery;
        }

        resolve(results);
      });
    });
  }

  return splitStream;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephenplusplus do you have a timeline of when this would be finished?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can probably wrap up stephenplusplus/split-array-stream#4 tomorrow.

}
}

const paginator = new Paginator();
export {paginator};

export {ResourceStream};
101 changes: 101 additions & 0 deletions src/resource-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*!
* Copyright 2019 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {Transform} from 'stream';
import {ParsedArguments} from './';

interface ResourceEvents<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to have to define this normal event-emitter/stream stuff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise the types for 'data' will just be any and users would have to explicitly set a type instead of being able to infer one.

addListener(event: 'data', listener: (data: T) => void): this;
emit(event: 'data', data: T): boolean;
on(event: 'data', listener: (data: T) => void): this;
once(event: 'data', listener: (data: T) => void): this;
prependListener(event: 'data', listener: (data: T) => void): this;
prependOnceListener(event: 'data', listener: (data: T) => void): this;
removeListener(event: 'data', listener: (data: T) => void): this;
}

export class ResourceStream<T> extends Transform implements ResourceEvents<T> {
_ended: boolean;
_maxApiCalls: number;
_nextQuery: {} | null;
_reading: boolean;
_requestFn: Function;
_requestsMade: number;
_resultsToSend: number;
constructor(args: ParsedArguments, requestFn: Function) {
const options = Object.assign({objectMode: true}, args.streamOptions);
super(options);

this._ended = false;
this._maxApiCalls = args.maxApiCalls === -1 ? Infinity : args.maxApiCalls!;
this._nextQuery = args.query!;
this._reading = false;
this._requestFn = requestFn;
this._requestsMade = 0;
this._resultsToSend = args.maxResults === -1 ? Infinity : args.maxResults!;
}
// tslint:disable-next-line:no-any
end(...args: any[]) {
this._ended = true;
return super.end(...args);
}
_read() {
if (this._reading) {
return;
}

this._reading = true;

this._requestFn(
this._nextQuery,
(err: Error | null, results: T[], nextQuery: {} | null) => {
if (err) {
this.destroy(err);
return;
}

this._nextQuery = nextQuery;

if (this._resultsToSend !== Infinity) {
results = results.splice(0, this._resultsToSend);
this._resultsToSend -= results.length;
}

let more = true;

for (const result of results) {
if (this._ended) {
break;
}
more = this.push(result);
}

const isFinished = !this._nextQuery || this._resultsToSend < 1;
const madeMaxCalls = ++this._requestsMade >= this._maxApiCalls;

if (isFinished || madeMaxCalls) {
this.end();
}

if (more && !this._ended) {
setImmediate(() => this._read());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to call _read() again ourselves? As I understand it, the internals will ask for more data on its own (calling _read()), so the Readable implementor don't have to string a big series of reads together manually.

Copy link
Contributor Author

@callmehiphop callmehiphop Jul 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it it will call _read when the user first requests data and then subsequent reads will happen after the buffer was full but it is now safe to continue reading. In this case because the buffer is not full, if we don't call it again we'll end up in a state where it hangs forever, I believe.

}

this._reading = false;
}
);
}
}
Loading