Lossless throttle behavior for RxJS.
losslessThrottle(delay, scheduler)
Re-emits ALL input events separated by at least delay
using optional
scheduler
(async by default)
IN: a----b-c------def------gh|
OUT: a----b---c----d---e---f---g---(h|)
Use npm
or yarn
npm install --save rx-op-lossless-throttle
yarn add rx-op-lossless-throttle
throttleOp(windowSizeInMsOrConfig, scheduler = null)
Config is a hash with the following keys:
windowSize [number]
- size of the window in ms (or scheduler frames);bufferSize [number|null]
- limit on the number of buffered items (null - no limit);raiseOnOverflow [boolean]
-true
- operator raises an error on overflow;false
- incoming items are silently discarded on overflow.
Note that with bufferSize: 0
and raiseOnOverflow: false
the behavior will
match the standard throttle
operator.
For example:
import { throttleLet, throttleOp } from "rx-op-lossless-throttle";
out$ = in$.let(throttleLet(100));
out$ = in$::throttleOp(100);
out$ = in$::throttleOp({
windowSize: 100,
bufferSize: 10,
raiseOnOverflow: true
})
or, if you're feeling lucky:
Observable.prototype.losslessThrottle = throttleOp;
out$ = in$.losslsssThrottle(100);
I had a piece of ETL code which did something like this:
producer API call, returns paginated results ->
several batches of ~100 items ->
basic processing ->
stream of independent items ->
consumer API, accepts a single item
See the problem here? Yes, one producer API call would result in 100 calls to the consumer API.
Now, in a perfect world:
- the consumer API would have proper throttling and it would scale to handle load spikes transparenty;
- or the consumer API would return a proper error code we'd be able to detect and re-try submission;
- or the consumer API would allow us to submit data in batches;
- or something else which wouldn't require us to deal with this problem in ETL logic.
The problem is we don't live in a perfect world and we don't have control over the consumer. On the other side, separating consumer API calls by 50-100ms eliminated the "consumer chokes on the spike" problem. By any means, it's not a perfect solution, but it works and gives time to find a better one.
Absolutely correct. You may (and you probably should) limit the buffer
size with bufferSize
configuration to prevent OOM. There's no
default limit as I cannot guess what kind of data you're processing,
how much memory you can spare on the buffer and how important the data
is.
- rx-op-lossless-throttle
- Lossless throttle behavior for RxJS.
- rx-op-sliding-window
- "Smooth" sliding window operator for RxJS
- rx-op-debounce-throttle
- A hybrid debounce + throttle operator for RxJS