Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flatten with user-defined concurrency: flattenParallel #161

Closed
plmrry opened this issue Dec 30, 2016 · 10 comments
Closed

Flatten with user-defined concurrency: flattenParallel #161

plmrry opened this issue Dec 30, 2016 · 10 comments

Comments

@plmrry
Copy link

plmrry commented Dec 30, 2016

There currently is no middle ground between flattenConcurrently and flattenSequentially. It would be nice to be able to specify the amount of concurrency desired.

Here's a half-baked snippet that I call flattenParallel, which handles a variable number (n) of streams concurrently:

function flattenParallel(n) {
  return function(input$$) {
    const pending = [];
    let active = 0;
    return xs.create({
      start: out => {
        function onNext(input$) {
          if (active > n) pending.push(input$);
          else {
            active++;
            input$.addListener({
              next: item => out.next(item),
              complete: () => {
                active--;
                if (pending.length > 0) onNext(pending.shift());
              }
            });
          }
        }
        input$$.addListener({
          next: onNext
        });
      },
      stop: () => {}
    });
  };
}

This provides a flexible abstraction of flattenSequentially and flattenConcurrently:

  • flattenSequentially is flattenParallel(0)
  • flattenConcurrently is flattenParallel(Infinity)
@kloy
Copy link

kloy commented Jan 7, 2017

This is the #1 feature keeping me off stream.

@staltz
Copy link
Owner

staltz commented Jan 12, 2017

Apart from "it would be nice", have any of you encountered a real need for this operator when writing real code?

@plmrry
Copy link
Author

plmrry commented Jan 12, 2017

Can't it be used in any situation when you want to be able to specify concurrency?

For example: If you want to read 10,000 files, and you want to open them in parallel, say 20 at a time.

I wrote it to handle bulk API requests – I needed to make thousands of API calls, with some concurrency, but without hitting my rate limit.

@staltz
Copy link
Owner

staltz commented Jan 12, 2017

For example: If you want to read 10,000 files, and you want to open them in parallel, say 20 at a time.

I'd say that's a use case for Node Streams.

I wrote it to handle bulk API requests – I needed to make thousands of API calls, with some concurrency, but without hitting my rate limit.

From a client? Sounds like the job of an intermediate backend.

I'm getting to a point here, which is: xstream is meant for frontend programming, and in specific, to support Cycle.js. I find it hard to find a usage for flattenParallel in that context.

@plmrry
Copy link
Author

plmrry commented Jan 12, 2017

I didn't realize the project had such a narrow focus.

Is there any reason xstream cannot or should not be used in a backend?

@kloy
Copy link

kloy commented Jan 12, 2017

@staltz Thank you for the replies.

Here are my use cases, which are all from the browser client.

  1. Uploading large files (up to 20 GB). We have logic to split a zip into chunks, verify each chunk, and upload the chunks using range requests. We can speed up each portion of this process by doing them in parallel up to a limit. Due to the amount of chunks we do not want to upload all at once as it would run into browser locks and prevent other, potentially higher priority requests from occurring.

  2. Downloading large pdfs for PDFjs. We have implemented a strategy around prefetching PDFs since a portion of our app is navigating through a list of PDFs. Again, we need the ability to prioritize requests so mutation operations on a document are not blocked by prefetching a pdf that is not in focus. At the same time, fetching multiple chunks at once can significantly improve transfer speed.

@staltz
Copy link
Owner

staltz commented Jan 12, 2017

Is there any reason xstream cannot or should not be used in a backend?

Because there are better options. xstream with its very small size is clearly meant for browsers, because kB size doesn't matter much in node.js backends. most.js is almost always a better choice in that case, with hyper performance in node.js.

Here are my use cases, which are all from the browser client.
Uploading large files (up to 20 GB). We have logic to split a zip into chunks, verify each chunk, and upload the chunks using range requests. We can speed up each portion of this process by doing them in parallel up to a limit. Due to the amount of chunks we do not want to upload all at once as it would run into browser locks and prevent other, potentially higher priority requests from occurring.
Downloading large pdfs for PDFjs. We have implemented a strategy around prefetching PDFs since a portion of our app is navigating through a list of PDFs. Again, we need the ability to prioritize requests so mutation operations on a document are not blocked by prefetching a pdf that is not in focus. At the same time, fetching multiple chunks at once can significantly improve transfer speed.

Really good use cases to report. I wouldn't have been able to predict these types of use alone. Thanks. I'll consider adding this feature then. It would probably be named flattenConcurrentlyAtMost(n).

@xtianjohns
Copy link
Contributor

@staltz, were you thinking adding this as an extra vs. core?

@staltz
Copy link
Owner

staltz commented Feb 19, 2017

@xtianjohns as an extra, named flattenConcurrentlyAtMost(n). By the way, it's worth noting that new development should happen on the next branch, not master, since we're going to release Cycle Unified soon, using xstream on next branch (which uses TypeScript 2.1).

@xtianjohns
Copy link
Contributor

xtianjohns commented Jul 24, 2018

I've got this implemented but the tests are pretty thin, I've just copied the tests for flattenConcurrently and flattenSequentially and substituted flattenConcurrentlyAtMost(Infinity) and flattenConcurrentlyAtMost(0) respectively, then added a couple more to verify the "throttling" behavior.

As I was working on this I realized that it was kinda funky to think about ...atMost(0) as meaning "only listen to 1 stream at a time". Conceptually, the number of streams the operator will imitate will always be n + 1, not n. I don't know if this is ideal or not, but to change the behavior in my implementation just requires changing a <= to <, so... 🤷‍♂️

PR incoming.

Also, this is a heck of an operator to write docs for, haha. I'll welcome any feedback about how to make it clearer.

xtianjohns added a commit to xtianjohns/xstream that referenced this issue Jul 24, 2018
Add flattenSequentiallyAtMost(n) extra.

flattenSequentiallyAtMost is designed to provide consumer-configurable
concurrency to flattening operations. Two flattening extras exist which
allow consumers to flatten a meta stream with maximum concurrency, or
with no concurrency. This new operator supports a concurrency limit,
representing the maximum amount of _additional_ streams to connect to
during flattening.

Resolve staltz#161.
xtianjohns added a commit to xtianjohns/xstream that referenced this issue Jul 24, 2018
Add flattenConcurrentlyAtMost(n) extra.

flattenConcurrentlyAtMost is designed to provide consumer-configurable
concurrency to flattening operations. Two flattening extras exist which
allow consumers to flatten a meta stream with maximum concurrency, or
with no concurrency. This new operator supports a concurrency limit,
representing the maximum amount of _additional_ streams to connect to
during flattening.

Resolve staltz#161.
xtianjohns added a commit to xtianjohns/xstream that referenced this issue Jan 9, 2019
Add flattenConcurrentlyAtMost(n) extra.

flattenConcurrentlyAtMost is designed to provide consumer-configurable
concurrency to flattening operations. Two flattening extras exist which
allow consumers to flatten a meta stream with maximum concurrency, or
with no concurrency. This new operator supports a concurrency limit,
representing the maximum amount of _additional_ streams to connect to
during flattening.

Resolve staltz#161.
staltz pushed a commit that referenced this issue Jan 10, 2019
Add flattenConcurrentlyAtMost(n) extra.

flattenConcurrentlyAtMost is designed to provide consumer-configurable
concurrency to flattening operations. Two flattening extras exist which
allow consumers to flatten a meta stream with maximum concurrency, or
with no concurrency. This new operator supports a concurrency limit,
representing the maximum amount of _additional_ streams to connect to
during flattening.

Resolve #161.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants