-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor!: rewrite streaming logic #136
refactor!: rewrite streaming logic #136
Conversation
I think this is completely fine, however, I would rather us pluck out the splitting and emitting of an array's members and put it into The version of split-array-stream that can handle this exists as a prototype here: https://gist.github.com/stephenplusplus/888373161f80a45516be64e911a84db5 Assuming split-array-stream worked as it did in the above gist, would you have any objections to separating responsibility as I suggested above? You had some concerns in the discussion on this issue, although I didn't fully understand. |
@stephenplusplus part of the reason I think we should drop split is because it makes implementing flow control difficult. Looking at your gist, it looks like the new strategy would be to recursively call either I'm all for keeping it if we can make it work, but I'm not really sure what that is supposed to look like. |
Isn't this backpressure? https://gist.github.com/stephenplusplus/888373161f80a45516be64e911a84db5#file-index-js-L21-L47 I copied the code from you! https://github.com/googleapis/nodejs-paginator/pull/136/files#diff-d6b7f8f4576a3b77584f738ea4b0258bR79 |
@stephenplusplus ~~I think the line in question is https://gist.github.com/stephenplusplus/888373161f80a45516be64e911a84db5#file-index-js-L44 Basically that check would indicate that the buffer is full, so I assume we call it again at the end of the current loop in hopes that it made some room. On the next call at least 1 item will be pushed into the stream before it is determined if it actually has room for said item. I believe if the stream were to be paused it would continuously push items in. If only the I think misread, looks like it could work. Can you provide an example on how you would integrate with your prototype? |
Yes sir, using this PR as a commit, or just pseudo? |
Just in case you didn't see this part, I showed two ways you can use SAS, either by passing a function that resolves with an array (https://gist.github.com/stephenplusplus/888373161f80a45516be64e911a84db5#file-index-js-L126) or by creating a stream that emits arrays (https://gist.github.com/stephenplusplus/888373161f80a45516be64e911a84db5#file-index-js-L100) |
@stephenplusplus I'm a little fuzzy on it, but if you want to take a crack at updating split and adding a commit to this PR I'd be ok with it. |
Codecov Report
@@ Coverage Diff @@
## master #136 +/- ##
=====================================
Coverage 100% 100%
=====================================
Files 1 2 +1
Lines 98 97 -1
Branches 19 20 +1
=====================================
- Hits 98 97 -1
Continue to review full report at Codecov.
|
import {Transform} from 'stream'; | ||
import {ParsedArguments} from './'; | ||
|
||
interface ResourceEvents<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason to have to define this normal event-emitter/stream stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise the types for 'data' will just be any
and users would have to explicitly set a type instead of being able to infer one.
parsedArguments: ParsedArguments, | ||
originalMethod: Function | ||
): ResourceStream<T> { | ||
return new ResourceStream<T>(parsedArguments, originalMethod); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a rough draft attempt at an option for how this could look if split-array-stream changes go through:
runAsStream_ (parsedArguments, originalMethod) {
let numMaxApiCalls = parsedArguments.maxApiCalls === -1 ? Infinity : parsedArguments.maxApiCalls;
let numRequestsMade;
let numResultsToSend = parsedArguments.maxResults === -1 ? Infinity : parsedArguments.maxResults;
let query = parsedArguments.query;
const splitStream = new SplitArrayStream(getPage);
function getPage() {
return new Promise((resolve, reject) => {
originalMethod(query, (err, results, nextQuery) => {
if (err) {
splitStream.destroy(err);
return;
}
if (numResultsToSend !== Infinity) {
results = results.splice(0, numResultsToSend);
numResultsToSend -= results.length;
}
const didMakeMaxCalls = ++numRequestsMade >= numMaxApiCalls;
if (didMakeMaxCalls) {
results.push(null);
} else {
query = nextQuery;
}
resolve(results);
});
});
}
return splitStream;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephenplusplus do you have a timeline of when this would be finished?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can probably wrap up stephenplusplus/split-array-stream#4 tomorrow.
} | ||
|
||
if (more && !this._ended) { | ||
setImmediate(() => this._read()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to call _read()
again ourselves? As I understand it, the internals will ask for more data on its own (calling _read()
), so the Readable implementor don't have to string a big series of reads together manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it it will call _read
when the user first requests data and then subsequent reads will happen after the buffer was full but it is now safe to continue reading. In this case because the buffer is not full, if we don't call it again we'll end up in a state where it hangs forever, I believe.
Fixes #134