Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kbn/optimizer] emit success event from reducer when all bundles cached #57945

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 198 additions & 29 deletions packages/kbn-optimizer/src/common/event_stream_helpers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,221 @@
*/

import * as Rx from 'rxjs';
import { toArray } from 'rxjs/operators';

import { summarizeEvent$ } from './event_stream_helpers';

it('emits each state with each event, ignoring events when reducer returns undefined', async () => {
const values = await summarizeEvent$(
Rx.of(1, 2, 3, 4, 5),
{
sum: 0,
},
(state, event) => {
if (event % 2) {
return {
sum: state.sum + event,
};
}
import { toArray, take } from 'rxjs/operators';

import { summarizeEventStream } from './event_stream_helpers';

it('emits each state with each event, ignoring events when summarizer returns undefined', async () => {
const event$ = Rx.of(1, 2, 3, 4, 5);
const initial = 0;
const values = await summarizeEventStream(event$, initial, (state, event) => {
if (event % 2) {
return state + event;
}
})
.pipe(toArray())
.toPromise();

expect(values).toMatchInlineSnapshot(`
Array [
Object {
"state": 0,
},
Object {
"event": 1,
"state": 1,
},
Object {
"event": 3,
"state": 4,
},
Object {
"event": 5,
"state": 9,
},
]
`);
});

it('interleaves injected events when source is synchronous', async () => {
const event$ = Rx.of(1, 7);
const initial = 0;
const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => {
if (event < 5) {
injectEvent(event + 2);
}

return state + event;
})
.pipe(toArray())
.toPromise();

expect(values).toMatchInlineSnapshot(`
Array [
Object {
"state": 0,
},
Object {
"event": 1,
"state": 1,
},
Object {
"event": 3,
"state": 4,
},
Object {
"event": 5,
"state": 9,
},
Object {
"event": 7,
"state": 16,
},
]
`);
});

it('interleaves injected events when source is asynchronous', async () => {
const event$ = Rx.of(1, 7, Rx.asyncScheduler);
const initial = 0;
const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => {
if (event < 5) {
injectEvent(event + 2);
}
)

return state + event;
})
.pipe(toArray())
.toPromise();

expect(values).toMatchInlineSnapshot(`
Array [
Object {
"state": Object {
"sum": 0,
},
"state": 0,
},
Object {
"event": 1,
"state": Object {
"sum": 1,
},
"state": 1,
},
Object {
"event": 3,
"state": Object {
"sum": 4,
},
"state": 4,
},
Object {
"event": 5,
"state": Object {
"sum": 9,
},
"state": 9,
},
Object {
"event": 7,
"state": 16,
},
]
`);
});

it('interleaves mulitple injected events in order', async () => {
const event$ = Rx.of(1);
const initial = 0;
const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => {
if (event < 10) {
injectEvent(10);
injectEvent(20);
injectEvent(30);
}

return state + event;
})
.pipe(toArray())
.toPromise();

expect(values).toMatchInlineSnapshot(`
Array [
Object {
"state": 0,
},
Object {
"event": 1,
"state": 1,
},
Object {
"event": 10,
"state": 11,
},
Object {
"event": 20,
"state": 31,
},
Object {
"event": 30,
"state": 61,
},
]
`);
});

it('stops an infinite stream when unsubscribed', async () => {
const event$ = Rx.of(1);
const initial = 0;
const summarize = jest.fn((prev, event, injectEvent) => {
// always inject a follow up event, making this infinite and synchronous
injectEvent(event + 1);
return prev + event;
});

const values = await summarizeEventStream(event$, initial, summarize)
.pipe(take(11), toArray())
.toPromise();

expect(values).toMatchInlineSnapshot(`
Array [
Object {
"state": 0,
},
Object {
"event": 1,
"state": 1,
},
Object {
"event": 2,
"state": 3,
},
Object {
"event": 3,
"state": 6,
},
Object {
"event": 4,
"state": 10,
},
Object {
"event": 5,
"state": 15,
},
Object {
"event": 6,
"state": 21,
},
Object {
"event": 7,
"state": 28,
},
Object {
"event": 8,
"state": 36,
},
Object {
"event": 9,
"state": 45,
},
Object {
"event": 10,
"state": 55,
},
]
`);

// ensure summarizer still only called 10 times after a timeout
expect(summarize).toHaveBeenCalledTimes(10);
await new Promise(resolve => setTimeout(resolve, 1000));
expect(summarize).toHaveBeenCalledTimes(10);
});
92 changes: 72 additions & 20 deletions packages/kbn-optimizer/src/common/event_stream_helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,91 @@
*/

import * as Rx from 'rxjs';
import { scan, distinctUntilChanged, startWith } from 'rxjs/operators';

export interface Update<Event, State> {
event?: Event;
state: State;
}

export type Summarizer<Event, State> = (prev: State, event: Event) => State | undefined;
export type EventInjector<Event> = (event: Event) => void;
export type Summarizer<Event, State> = (
prev: State,
event: Event,
injectEvent: EventInjector<Event>
) => State | undefined;

/**
* Transform an event stream into a state update stream which emits
* the events and individual states for each event.
*/
export const summarizeEvent$ = <Event, State>(
export const summarizeEventStream = <Event, State>(
event$: Rx.Observable<Event>,
initialState: State,
reducer: Summarizer<Event, State>
summarize: Summarizer<Event, State>
) => {
const initUpdate: Update<Event, State> = {
state: initialState,
};

return event$.pipe(
scan((prev, event): Update<Event, State> => {
const newState = reducer(prev.state, event);
return newState === undefined
? prev
: {
return new Rx.Observable<Update<Event, State>>(subscriber => {
const eventBuffer: Event[] = [];

let processingEventBuffer = false;
let eventStreamComplete = false;
let previousState = initialState;

const injectEvent = (nextEvent: Event) => {
eventBuffer.push(nextEvent);

if (processingEventBuffer) {
return;
}

try {
processingEventBuffer = true;

while (eventBuffer.length && !subscriber.closed) {
const event = eventBuffer.shift()!;
const nextState = summarize(previousState, event, injectEvent);

if (nextState === undefined) {
// skip this event
continue;
}

// emit state update
previousState = nextState;
subscriber.next({
event,
state: newState,
};
}, initUpdate),
distinctUntilChanged(),
startWith(initUpdate)
);
state: nextState,
});
}

if (eventStreamComplete) {
subscriber.complete();
}
} catch (error) {
subscriber.error(error);
} finally {
processingEventBuffer = false;
}
};

// send initial "update"
subscriber.next({
state: initialState,
});

// inject all subsequent events to the internal eventBuffer
subscriber.add(
event$.subscribe(
injectEvent,
error => {
subscriber.error(error);
},
() => {
eventStreamComplete = true;
if (!processingEventBuffer && eventBuffer.length === 0) {
subscriber.complete();
}
}
)
);
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ it('uses cache on second run and exist cleanly', async () => {
"initializing",
"initializing",
"initialized",
"success",
]
`);
});
Loading