Skip to content

Commit

Permalink
Merge pull request #462 from afshin/stream
Browse files Browse the repository at this point in the history
Add async iterable `Stream` class that inherits from `Signal`
  • Loading branch information
afshin authored Nov 11, 2022
2 parents 01df8d2 + 4735392 commit ce0ce76
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 6 deletions.
1 change: 1 addition & 0 deletions packages/signaling/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
},
"dependencies": {
"@lumino/algorithm": "^2.0.0-alpha.6",
"@lumino/coreutils": "^2.0.0-alpha.6",
"@lumino/properties": "^2.0.0-alpha.6"
},
"devDependencies": {
Expand Down
66 changes: 62 additions & 4 deletions packages/signaling/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
| The full license is in the file LICENSE, distributed with this software.
|----------------------------------------------------------------------------*/
import { ArrayExt, find } from '@lumino/algorithm';
import { PromiseDelegate } from '@lumino/coreutils';
import { AttachedProperty } from '@lumino/properties';

/**
Expand Down Expand Up @@ -84,6 +85,11 @@ export interface ISignal<T, U> {
disconnect(slot: Slot<T, U>, thisArg?: any): boolean;
}

/**
* An object that is both a signal and an async iterable.
*/
export interface IStream<T, U> extends ISignal<T, U>, AsyncIterable<U> {}

/**
* A concrete implementation of `ISignal`.
*
Expand Down Expand Up @@ -157,11 +163,11 @@ export class Signal<T, U> implements ISignal<T, U> {
* @param fn The callback during which the signal is blocked
*/
block(fn: () => void): void {
this._blockedCount++;
this.blocked++;
try {
fn();
} finally {
this._blockedCount--;
this.blocked--;
}
}

Expand Down Expand Up @@ -204,12 +210,15 @@ export class Signal<T, U> implements ISignal<T, U> {
* Exceptions thrown by connected slots will be caught and logged.
*/
emit(args: U): void {
if (!this._blockedCount) {
if (!this.blocked) {
Private.emit(this, args);
}
}

private _blockedCount = 0;
/**
* If `blocked` is not `0`, the signal will not emit.
*/
protected blocked = 0;
}

/**
Expand Down Expand Up @@ -338,10 +347,59 @@ export namespace Signal {
}
}

/**
* A stream with the characteristics of a signal and an async iterable.
*/
export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
/**
* Return an async iterator that yields every emission.
*/
async *[Symbol.asyncIterator](): AsyncIterableIterator<U> {
let pending = this._pending;
while (true) {
try {
const { args, next } = await pending.promise;
pending = next;
yield args;
} catch (_) {
return; // Any promise rejection stops the iterator.
}
}
}

/**
* Emit the signal, invoke the connected slots, and yield the emission.
*
* @param args - The args to pass to the connected slots.
*/
emit(args: U): void {
if (!this.blocked) {
const pending = this._pending;
this._pending = new PromiseDelegate();
pending.resolve({ args, next: this._pending });
super.emit(args);
}
}

/**
* Stop the stream's async iteration.
*/
stop(): void {
this._pending.reject('stop');
}

private _pending: Private.Pending<U> = new PromiseDelegate();
}

/**
* The namespace for the module implementation details.
*/
namespace Private {
/**
* A pending promise in a promise chain underlying a stream.
*/
export type Pending<U> = PromiseDelegate<{ args: U; next: Pending<U> }>;

/**
* The signal exception handler function.
*/
Expand Down
132 changes: 130 additions & 2 deletions packages/signaling/tests/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
|----------------------------------------------------------------------------*/
import { expect } from 'chai';

import { Signal } from '@lumino/signaling';
import { Signal, Stream } from '@lumino/signaling';

class TestObject {
readonly one = new Signal<this, void>(this);

readonly two = new Signal<this, number>(this);

readonly three = new Signal<this, string[]>(this);
readonly three = new Stream<this, string[]>(this);
}

class ExtendedObject extends TestObject {
Expand Down Expand Up @@ -582,4 +582,132 @@ describe('@lumino/signaling', () => {
});
});
});

describe('Stream', () => {
describe('#[Symbol.asyncIterator]()', () => {
it('should yield emissions and respect blocking', async () => {
const stream = new Stream<unknown, string>({});
const input = 'async';
const expected = 'aINTERRUPTEDsync';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect(() => {
if (once) {
once = false;
stream.emit('I');
stream.emit('N');
stream.emit('T');
stream.emit('E');
stream.emit('R');
stream.emit('R');
stream.emit('U');
stream.emit('P');
stream.emit('T');
stream.emit('E');
stream.emit('D');
}
});
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
wait.then(() => stream.stop());
for await (const letter of stream) {
emitted = emitted.concat(letter);
}
expect(emitted).to.equal(expected);
});

it('should return an async iterator', async () => {
const stream = new Stream<unknown, string>({});
const input = 'iterator';
const expected = 'iAHEMterator';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect(() => {
if (once) {
once = false;
stream.emit('A');
stream.emit('H');
stream.emit('E');
stream.emit('M');
}
});
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
wait.then(() => stream.stop());

const it = stream[Symbol.asyncIterator]();
let emission: IteratorResult<string, any>;
while (!(emission = await it.next()).done) {
emitted = emitted.concat(emission.value);
}

expect(emitted).to.equal(expected);
});
});

describe('#stop()', () => {
it('should stop emissions in the async interable', async () => {
const stream = new Stream<unknown, string>({});
const input = 'continuing';
const expected = 'cINTERRUPTEDontinuing';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect(() => {
if (once) {
once = false;
stream.emit('I');
stream.emit('N');
stream.emit('T');
stream.emit('E');
stream.emit('R');
stream.emit('R');
stream.emit('U');
stream.emit('P');
stream.emit('T');
stream.emit('E');
stream.emit('D');
}
});
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.stop());
for await (const letter of stream) {
emitted = emitted.concat(letter);
}
expect(emitted).to.equal(expected);
});

it('should resolve to `done` in an async iterator', async () => {
const stream = new Stream<unknown, string>({});
const input = 'stopiterator';
const expected = 'sAHEMtopiterator';
const wait = Promise.resolve();
let emitted = '';
let once = true;
stream.connect(() => {
if (once) {
once = false;
stream.emit('A');
stream.emit('H');
stream.emit('E');
stream.emit('M');
}
});
input.split('').forEach(x => wait.then(() => stream.emit(x)));
wait.then(() => stream.stop());

const it = stream[Symbol.asyncIterator]();
let emission: IteratorResult<string, any>;
while (!(emission = await it.next()).done) {
emitted = emitted.concat(emission.value);
}

expect(emitted).to.equal(expected);
});
});
});
});
12 changes: 12 additions & 0 deletions review/api/signaling.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@ export interface ISignal<T, U> {
disconnect(slot: Slot<T, U>, thisArg?: any): boolean;
}

// @public
export interface IStream<T, U> extends ISignal<T, U>, AsyncIterable<U> {
}

// @public
export class Signal<T, U> implements ISignal<T, U> {
constructor(sender: T);
block(fn: () => void): void;
protected blocked: number;
connect(slot: Slot<T, U>, thisArg?: unknown): boolean;
disconnect(slot: Slot<T, U>, thisArg?: unknown): boolean;
emit(args: U): void;
Expand All @@ -37,6 +42,13 @@ export namespace Signal {
// @public
export type Slot<T, U> = (sender: T, args: U) => void;

// @public
export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
[Symbol.asyncIterator](): AsyncIterableIterator<U>;
emit(args: U): void;
stop(): void;
}

// (No @packageDocumentation comment for this package)

```

0 comments on commit ce0ce76

Please sign in to comment.