Skip to content

Commit

Permalink
fix(zip): zip now accepts an array of arguments like its counterparts
Browse files Browse the repository at this point in the history
`forkJoin` and `combineLatest` both allow an array of arguments to be passed to it like so: `forkJoin([a$, b$, c$])`. Now `zip` does the same: `zip([a$, b$, c$])`

BREAKING CHANGE: Zipping a single array will now have a different result. This is an extreme corner-case, because it is very unlikely that anyone would want to zip an array with nothing at all.
  • Loading branch information
benlesh committed Sep 27, 2020
1 parent b3f5cf8 commit 3123b67
Showing 1 changed file with 35 additions and 41 deletions.
76 changes: 35 additions & 41 deletions src/internal/observable/zip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { Observable } from '../Observable';
import { ObservableInput, ObservedValueOf } from '../types';
import { Subscription } from '../Subscription';
import { from } from './from';
import { argsOrArgArray } from '../util/argsOrArgArray';
import { EMPTY } from './empty';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';

/* tslint:disable:max-line-length */
/** @deprecated resultSelector is no longer supported, pipe to map instead */
Expand Down Expand Up @@ -183,47 +186,38 @@ export function zip<O extends ObservableInput<any>, R>(
resultSelector = sources.pop() as typeof resultSelector;
}

return new Observable<ObservedValueOf<O>[]>((subscriber) => {
const buffers: ObservedValueOf<O>[][] = sources.map(() => []);
const completed = sources.map(() => false);
const subscription = new Subscription();
sources = argsOrArgArray(sources);

const tryEmit = () => {
if (buffers.every((buffer) => buffer.length > 0)) {
let result: any = buffers.map((buffer) => buffer.shift()!);
if (resultSelector) {
try {
result = resultSelector(...result);
} catch (err) {
subscriber.error(err);
return;
}
}
subscriber.next(result);
if (buffers.some((buffer, i) => buffer.length === 0 && completed[i])) {
subscriber.complete();
}
}
};
return sources.length
? new Observable<ObservedValueOf<O>[]>((subscriber) => {
const buffers: ObservedValueOf<O>[][] = sources.map(() => []);
const completed = sources.map(() => false);
const subscription = new Subscription();

for (let i = 0; !subscriber.closed && i < sources.length; i++) {
const source = from(sources[i]);
subscription.add(
source.subscribe({
next: (value) => {
buffers[i].push(value);
tryEmit();
},
error: (err) => subscriber.error(err),
complete: () => {
completed[i] = true;
if (buffers[i].length === 0) {
subscriber.complete();
}
},
})
);
}
return subscription;
});
for (let i = 0; !subscriber.closed && i < sources.length; i++) {
const source = from(sources[i]);
source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => {
buffers[i].push(value);
if (buffers.every((buffer) => buffer.length)) {
let result: any = buffers.map((buffer) => buffer.shift()!);
subscriber.next(resultSelector ? resultSelector(...result) : result);
if (buffers.some((buffer, i) => !buffer.length && completed[i])) {
subscriber.complete();
}
}
},
undefined,
() => {
completed[i] = true;
!buffers[i].length && subscriber.complete();
}
)
);
}
return subscription;
})
: EMPTY;
}

0 comments on commit 3123b67

Please sign in to comment.