Skip to content

Commit

Permalink
feat: RxJS now supports first-class interop with AsyncIterables
Browse files Browse the repository at this point in the history
* feat: Observables support for await

* feat: convert async iterable to observable

* chore: update side-effects expectations

* fix: make sure thrown errors in for await loop clean up subscription

* chore: update side-effects assertions
  • Loading branch information
benlesh authored Apr 2, 2020
1 parent e69b765 commit 4fa9d01
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 20 deletions.
2 changes: 1 addition & 1 deletion integration/side-effects/snapshots/esm/ajax.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@

import "tslib";
2 changes: 1 addition & 1 deletion integration/side-effects/snapshots/esm/fetch.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@

import "tslib";
2 changes: 2 additions & 0 deletions integration/side-effects/snapshots/esm/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import "tslib";

var NotificationKind;

(function(NotificationKind) {
Expand Down
2 changes: 2 additions & 0 deletions integration/side-effects/snapshots/esm/operators.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import "tslib";

var NotificationKind;

(function(NotificationKind) {
Expand Down
2 changes: 2 additions & 0 deletions integration/side-effects/snapshots/esm/testing.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import "tslib";

var NotificationKind;

(function(NotificationKind) {
Expand Down
2 changes: 2 additions & 0 deletions integration/side-effects/snapshots/esm/websocket.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import "tslib";

var NotificationKind;

(function(NotificationKind) {
Expand Down
34 changes: 20 additions & 14 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 56 additions & 2 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { expect } from 'chai';
import * as sinon from 'sinon';
import { Observer, TeardownLogic } from '../src/internal/types';
import { cold, expectObservable, expectSubscriptions } from './helpers/marble-testing';
import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty } from 'rxjs';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip } from 'rxjs/operators';
import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty, interval } from 'rxjs';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, take, finalize } from 'rxjs/operators';

declare const asDiagram: any, rxTestScheduler: any;

Expand Down Expand Up @@ -946,3 +946,57 @@ describe('Observable.lift', () => {
}
});
});

if (Symbol && Symbol.asyncIterator) {
describe('async iterator support', () => {
it('should work for sync observables', async () => {
const source = of(1, 2, 3);
const results: number[] = [];
for await (const value of source) {
results.push(value);
}
expect(results).to.deep.equal([1, 2, 3]);
});

it('should throw if the observable errors', async () => {
const source = throwError(new Error('bad'));
let error: any;
try {
for await (const _ of source) {
// do nothing
}
} catch (err) {
error = err;
}
expect(error).to.be.an.instanceOf(Error);
expect(error.message).to.equal('bad');
});

it('should support async observables', async () => {
const source = interval(10).pipe(take(3));
const results: number[] = [];
for await (const value of source) {
results.push(value);
}
expect(results).to.deep.equal([0, 1, 2]);
});

it('should do something clever if the loop exits', async () => {
let finalized = false;
const source = interval(10).pipe(take(10), finalize(() => finalized = true));
const results: number[] = [];
try {
for await (const value of source) {
results.push(value);
if (value === 1) {
throw new Error('bad');
}
}
} catch (err) {
// ignore
}
expect(results).to.deep.equal([0, 1]);
expect(finalized).to.be.true;
});
});
}
28 changes: 28 additions & 0 deletions spec/observables/from-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,34 @@ describe('from', () => {
{ name: 'arguments', value: getArguments('x') },
];

if (Symbol && Symbol.asyncIterator) {
const fakeAsyncIterator = (...values: any[]) => {
return {
[Symbol.asyncIterator]() {
let i = 0;
return {
next() {
const index = i++;
if (index < values.length) {
return Promise.resolve({ done: false, value: values[index] });
} else {
return Promise.resolve({ done: true });
}
},
[Symbol.asyncIterator]() {
return this;
}
};
}
};
};

sources.push({
name: 'async-iterator',
value: fakeAsyncIterator('x')
});
}

for (const source of sources) {
it(`should accept ${source.name}`, (done) => {
let nextInvoked = false;
Expand Down
22 changes: 22 additions & 0 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { throwError } from './observable/throwError';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';
import { config } from './config';
import { asyncIteratorFrom } from './asyncIteratorFrom';

/**
* A representation of any set of values over any amount of time. This is the most basic building block
Expand Down Expand Up @@ -391,3 +392,24 @@ function getPromiseCtor(promiseCtor: PromiseConstructorLike | undefined) {

return promiseCtor;
}

export interface Observable<T> {
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
}

(function () {
/**
* We only add this symbol if the runtime supports it.
* Adding this adds support for subscribing to observables
* via `for await(const value of source$) {}`
*
* This passes muster in Node 9, which does not support
* async iterators. As well as working in Node 12, which does
* support the symbol.
*/
if (Symbol && Symbol.asyncIterator) {
Observable.prototype[Symbol.asyncIterator] = function () {
return asyncIteratorFrom(this);
};
}
})();
62 changes: 62 additions & 0 deletions src/internal/asyncIteratorFrom.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { Observable } from './Observable';
import { Deferred } from './util/deferred';

export function asyncIteratorFrom<T>(source: Observable<T>) {
return coroutine(source);
}

async function* coroutine<T>(source: Observable<T>) {
const deferreds: Deferred<IteratorResult<T>>[] = [];
const values: T[] = [];
let hasError = false;
let error: any = null;
let completed = false;

const subs = source.subscribe({
next: value => {
if (deferreds.length > 0) {
deferreds.shift()!.resolve({ value, done: false });
} else {
values.push(value);
}
},
error: err => {
hasError = true;
error = err;
while (deferreds.length > 0) {
deferreds.shift()!.reject(err);
}
},
complete: () => {
completed = true;
while (deferreds.length > 0) {
deferreds.shift()!.resolve({ value: undefined, done: true });
}
},
});

try {
while (true) {
if (values.length > 0) {
yield values.shift();
} else if (completed) {
return;
} else if (hasError) {
throw error;
} else {
const d = new Deferred<IteratorResult<T>>();
deferreds.push(d);
const result = await d.promise;
if (result.done) {
return;
} else {
yield result.value;
}
}
}
} catch (err) {
throw err;
} finally {
subs.unsubscribe();
}
}
28 changes: 28 additions & 0 deletions src/internal/scheduled/scheduleAsyncIterable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { SchedulerLike } from '../types';
import { Observable } from '../Observable';
import { Subscription } from '../Subscription';

export function scheduleAsyncIterable<T>(input: AsyncIterable<T>, scheduler: SchedulerLike) {
if (!input) {
throw new Error('Iterable cannot be null');
}
return new Observable<T>(subscriber => {
const sub = new Subscription();
sub.add(
scheduler.schedule(() => {
const iterator = input[Symbol.asyncIterator]();
sub.add(scheduler.schedule(function () {
iterator.next().then(result => {
if (result.done) {
subscriber.complete();
} else {
subscriber.next(result.value);
this.schedule();
}
});
}));
})
);
return sub;
});
}
4 changes: 3 additions & 1 deletion src/internal/scheduled/scheduled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { isArrayLike } from '../util/isArrayLike';
import { isIterable } from '../util/isIterable';
import { ObservableInput, SchedulerLike } from '../types';
import { Observable } from '../Observable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';

/**
* Converts from a common {@link ObservableInput} type to an observable where subscription and emissions
Expand All @@ -30,8 +31,9 @@ export function scheduled<T>(input: ObservableInput<T>, scheduler: SchedulerLike
return scheduleArray(input, scheduler);
} else if (isIterable(input) || typeof input === 'string') {
return scheduleIterable(input, scheduler);
} else if (Symbol && Symbol.asyncIterator && typeof (input as any)[Symbol.asyncIterator] === 'function') {
return scheduleAsyncIterable(input as any, scheduler);
}
}

throw new TypeError((input !== null && typeof input || input) + ' is not observable');
}
2 changes: 1 addition & 1 deletion src/internal/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface Subscribable<T> {
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Unsubscribable;
}

export type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;
export type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T> | AsyncIterableIterator<T>;

/** @deprecated use {@link InteropObservable } */
export type ObservableLike<T> = InteropObservable<T>;
Expand Down
8 changes: 8 additions & 0 deletions src/internal/util/deferred.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export class Deferred<T> {
resolve: (value?: T | PromiseLike<T> | undefined) => void = null!;
reject: (reason?: any) => void = null!;
promise = new Promise<T>((a, b) => {
this.resolve = a;
this.reject = b;
});
}
6 changes: 6 additions & 0 deletions src/internal/util/subscribeTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { iterator as Symbol_iterator } from '../symbol/iterator';
import { observable as Symbol_observable } from '../symbol/observable';
import { Subscription } from '../Subscription';
import { Subscriber } from '../Subscriber';
import { subscribeToAsyncIterable } from './subscribeToAsyncIterable';

export const subscribeTo = <T>(result: ObservableInput<T>): (subscriber: Subscriber<T>) => Subscription | void => {
if (!!result && typeof (result as any)[Symbol_observable] === 'function') {
Expand All @@ -20,6 +21,11 @@ export const subscribeTo = <T>(result: ObservableInput<T>): (subscriber: Subscri
return subscribeToPromise(result);
} else if (!!result && typeof (result as any)[Symbol_iterator] === 'function') {
return subscribeToIterable(result as any);
} else if (
Symbol && Symbol.asyncIterator &&
!!result && typeof (result as any)[Symbol.asyncIterator] === 'function'
) {
return subscribeToAsyncIterable(result as any);
} else {
const value = isObject(result) ? 'an invalid object' : `'${result}'`;
const msg = `You provided ${value} where a stream was expected.`
Expand Down
14 changes: 14 additions & 0 deletions src/internal/util/subscribeToAsyncIterable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Subscriber } from '../Subscriber';

export function subscribeToAsyncIterable<T>(asyncIterable: AsyncIterable<T>) {
return (subscriber: Subscriber<T>) => {
process(asyncIterable, subscriber).catch(err => subscriber.error(err));
};
}

async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) {
for await (const value of asyncIterable) {
subscriber.next(value);
}
subscriber.complete();
}

0 comments on commit 4fa9d01

Please sign in to comment.