From a500d4f94a67ba2d046e4a6234935811efaae0e8 Mon Sep 17 00:00:00 2001 From: spalger Date: Tue, 18 Feb 2020 17:56:30 -0700 Subject: [PATCH 1/3] emit success event from reducer when all bundles cached --- .../src/common/event_stream_helpers.test.ts | 152 +++++++++++++++--- .../src/common/event_stream_helpers.ts | 90 ++++++++--- .../basic_optimization.test.ts | 1 + .../kbn-optimizer/src/log_optimizer_state.ts | 18 ++- .../optimizer/handle_optimizer_completion.ts | 5 - .../src/optimizer/optimizer_reducer.ts | 19 ++- packages/kbn-optimizer/src/run_optimizer.ts | 4 +- 7 files changed, 229 insertions(+), 60 deletions(-) diff --git a/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts b/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts index 60982abff2d87..3b6193e9e417a 100644 --- a/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts +++ b/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts @@ -20,49 +20,151 @@ import * as Rx from 'rxjs'; import { toArray } from 'rxjs/operators'; -import { summarizeEvent$ } from './event_stream_helpers'; +import { summarizeEventStream } from './event_stream_helpers'; it('emits each state with each event, ignoring events when reducer returns undefined', async () => { - const values = await summarizeEvent$( - Rx.of(1, 2, 3, 4, 5), - { - sum: 0, - }, - (state, event) => { - if (event % 2) { - return { - sum: state.sum + event, - }; - } + const event$ = Rx.of(1, 2, 3, 4, 5); + const initial = 0; + const values = await summarizeEventStream(event$, initial, (state, event) => { + if (event % 2) { + return state + event; } - ) + }) .pipe(toArray()) .toPromise(); expect(values).toMatchInlineSnapshot(` Array [ Object { - "state": Object { - "sum": 0, - }, + "state": 0, }, Object { "event": 1, - "state": Object { - "sum": 1, - }, + "state": 1, }, Object { "event": 3, - "state": Object { - "sum": 4, - }, + "state": 4, }, Object { "event": 5, - "state": Object { - "sum": 9, - }, + "state": 9, + }, + ] + `); +}); + +it('interleaves injected events when source is synchronous', async () => { + const event$ = Rx.of(1, 7); + const initial = 0; + const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => { + if (event < 5) { + injectEvent(event + 2); + } + + return state + event; + }) + .pipe(toArray()) + .toPromise(); + + expect(values).toMatchInlineSnapshot(` + Array [ + Object { + "state": 0, + }, + Object { + "event": 1, + "state": 1, + }, + Object { + "event": 3, + "state": 4, + }, + Object { + "event": 5, + "state": 9, + }, + Object { + "event": 7, + "state": 16, + }, + ] + `); +}); + +it('interleaves injected events when source is asynchronous', async () => { + const event$ = Rx.of(1, 7, Rx.asyncScheduler); + const initial = 0; + const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => { + if (event < 5) { + injectEvent(event + 2); + } + + return state + event; + }) + .pipe(toArray()) + .toPromise(); + + expect(values).toMatchInlineSnapshot(` + Array [ + Object { + "state": 0, + }, + Object { + "event": 1, + "state": 1, + }, + Object { + "event": 3, + "state": 4, + }, + Object { + "event": 5, + "state": 9, + }, + Object { + "event": 7, + "state": 16, + }, + ] + `); +}); + +it('interleaves mulitple injected events in order', async () => { + const event$ = Rx.of(1); + const initial = 0; + const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => { + if (event < 10) { + injectEvent(10); + injectEvent(20); + injectEvent(30); + } + + return state + event; + }) + .pipe(toArray()) + .toPromise(); + + expect(values).toMatchInlineSnapshot(` + Array [ + Object { + "state": 0, + }, + Object { + "event": 1, + "state": 1, + }, + Object { + "event": 10, + "state": 11, + }, + Object { + "event": 20, + "state": 31, + }, + Object { + "event": 30, + "state": 61, }, ] `); diff --git a/packages/kbn-optimizer/src/common/event_stream_helpers.ts b/packages/kbn-optimizer/src/common/event_stream_helpers.ts index c1585f79ede6e..e36e9c77ebd13 100644 --- a/packages/kbn-optimizer/src/common/event_stream_helpers.ts +++ b/packages/kbn-optimizer/src/common/event_stream_helpers.ts @@ -18,39 +18,91 @@ */ import * as Rx from 'rxjs'; -import { scan, distinctUntilChanged, startWith } from 'rxjs/operators'; export interface Update { event?: Event; state: State; } -export type Summarizer = (prev: State, event: Event) => State | undefined; +export type Dispatch = (event: Event) => void; +export type Summarizer = ( + prev: State, + event: Event, + injectEvent: Dispatch +) => State | undefined; /** * Transform an event stream into a state update stream which emits * the events and individual states for each event. */ -export const summarizeEvent$ = ( +export const summarizeEventStream = ( event$: Rx.Observable, initialState: State, reducer: Summarizer ) => { - const initUpdate: Update = { - state: initialState, - }; - - return event$.pipe( - scan((prev, event): Update => { - const newState = reducer(prev.state, event); - return newState === undefined - ? prev - : { + return new Rx.Observable>(subscriber => { + const eventBuffer: Event[] = []; + + let processingEventBuffer = false; + let eventStreamComplete = false; + let previousState = initialState; + + const injectEvent = (nextEvent: Event) => { + eventBuffer.push(nextEvent); + + if (processingEventBuffer) { + return; + } + + try { + processingEventBuffer = true; + + while (eventBuffer.length && !subscriber.closed) { + const event = eventBuffer.shift()!; + const nextState = reducer(previousState, event, injectEvent); + + if (nextState === undefined) { + // skip this event + continue; + } + + // emit state update + previousState = nextState; + subscriber.next({ event, - state: newState, - }; - }, initUpdate), - distinctUntilChanged(), - startWith(initUpdate) - ); + state: nextState, + }); + } + + if (eventStreamComplete) { + subscriber.complete(); + } + } catch (error) { + subscriber.error(error); + } finally { + processingEventBuffer = false; + } + }; + + // send initial "update" + subscriber.next({ + state: initialState, + }); + + // inject all subsequent events to the internal eventBuffer + subscriber.add( + event$.subscribe( + injectEvent, + error => { + subscriber.error(error); + }, + () => { + eventStreamComplete = true; + if (!processingEventBuffer && eventBuffer.length === 0) { + subscriber.complete(); + } + } + ) + ); + }); }; diff --git a/packages/kbn-optimizer/src/integration_tests/basic_optimization.test.ts b/packages/kbn-optimizer/src/integration_tests/basic_optimization.test.ts index b35788009dd92..fec31cbe40dfe 100644 --- a/packages/kbn-optimizer/src/integration_tests/basic_optimization.test.ts +++ b/packages/kbn-optimizer/src/integration_tests/basic_optimization.test.ts @@ -179,6 +179,7 @@ it('uses cache on second run and exist cleanly', async () => { "initializing", "initializing", "initialized", + "success", ] `); }); diff --git a/packages/kbn-optimizer/src/log_optimizer_state.ts b/packages/kbn-optimizer/src/log_optimizer_state.ts index 1ee4e47bfd9ee..5217581d1b413 100644 --- a/packages/kbn-optimizer/src/log_optimizer_state.ts +++ b/packages/kbn-optimizer/src/log_optimizer_state.ts @@ -77,10 +77,6 @@ export function logOptimizerState(log: ToolingLog, config: OptimizerConfig) { loggedInit = true; log.info(`initialized, ${state.offlineBundles.length} bundles cached`); } - - if (state.onlineBundles.length === 0) { - log.success(`all bundles cached, success after ${state.durSec}`); - } return; } @@ -123,10 +119,16 @@ export function logOptimizerState(log: ToolingLog, config: OptimizerConfig) { if (state.phase === 'success') { const buildCount = bundlesThatWereBuilt.size; bundlesThatWereBuilt.clear(); - log.success( - `${buildCount} bundles compiled successfully after ${state.durSec} sec` + - (config.watch ? ', watching for changes' : '') - ); + + if (state.offlineBundles.length && buildCount === 0) { + log.success(`all bundles cached, success after ${state.durSec} sec`); + } else { + log.success( + `${buildCount} bundles compiled successfully after ${state.durSec} sec` + + (config.watch ? ', watching for changes' : '') + ); + } + return true; } diff --git a/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts b/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts index 9587902cc4187..fe2fa320818a2 100644 --- a/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts +++ b/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts @@ -44,11 +44,6 @@ export function handleOptimizerCompletion(config: OptimizerConfig) { return; } - if (prevState?.phase === 'initialized' && prevState.onlineBundles.length === 0) { - // all bundles cached - return; - } - if (prevState?.phase === 'issue') { throw createFailError('webpack issue'); } diff --git a/packages/kbn-optimizer/src/optimizer/optimizer_reducer.ts b/packages/kbn-optimizer/src/optimizer/optimizer_reducer.ts index c1e6572bd7e75..f5122461bff80 100644 --- a/packages/kbn-optimizer/src/optimizer/optimizer_reducer.ts +++ b/packages/kbn-optimizer/src/optimizer/optimizer_reducer.ts @@ -30,8 +30,13 @@ export interface OptimizerInitializedEvent { type: 'optimizer initialized'; } +export interface AllBundlesCachedEvent { + type: 'all bundles cached'; +} + export type OptimizerEvent = | OptimizerInitializedEvent + | AllBundlesCachedEvent | ChangeEvent | WorkerMsg | WorkerStatus @@ -95,13 +100,25 @@ function getStatePhase(states: CompilerMsg[]) { export function createOptimizerReducer( config: OptimizerConfig ): Summarizer { - return (state, event) => { + return (state, event, injectEvent) => { if (event.type === 'optimizer initialized') { + if (state.onlineBundles.length === 0) { + injectEvent({ + type: 'all bundles cached', + }); + } + return createOptimizerState(state, { phase: 'initialized', }); } + if (event.type === 'all bundles cached') { + return createOptimizerState(state, { + phase: 'success', + }); + } + if (event.type === 'worker error' || event.type === 'compiler error') { // unrecoverable error states const error = new Error(event.errorMsg); diff --git a/packages/kbn-optimizer/src/run_optimizer.ts b/packages/kbn-optimizer/src/run_optimizer.ts index d2daa79feab7e..a4f14b187407f 100644 --- a/packages/kbn-optimizer/src/run_optimizer.ts +++ b/packages/kbn-optimizer/src/run_optimizer.ts @@ -20,7 +20,7 @@ import * as Rx from 'rxjs'; import { mergeMap, share, observeOn } from 'rxjs/operators'; -import { summarizeEvent$, Update } from './common'; +import { summarizeEventStream, Update } from './common'; import { OptimizerConfig, @@ -66,7 +66,7 @@ export function runOptimizer(config: OptimizerConfig) { const workerEvent$ = runWorkers(config, cacheKey, bundleCacheEvent$, changeEvent$); // create the stream that summarized all the events into specific states - return summarizeEvent$( + return summarizeEventStream( Rx.merge(init$, changeEvent$, workerEvent$), { phase: 'initializing', From 737214a72f89e34f353e7104c6e8a05b37daaa0a Mon Sep 17 00:00:00 2001 From: spalger Date: Wed, 19 Feb 2020 07:21:51 -0700 Subject: [PATCH 2/3] verify that infinite streams can be broken by unsubscribing --- .../src/common/event_stream_helpers.test.ts | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts b/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts index 3b6193e9e417a..51add0b24ccb2 100644 --- a/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts +++ b/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts @@ -18,7 +18,7 @@ */ import * as Rx from 'rxjs'; -import { toArray } from 'rxjs/operators'; +import { toArray, take } from 'rxjs/operators'; import { summarizeEventStream } from './event_stream_helpers'; @@ -169,3 +169,70 @@ it('interleaves mulitple injected events in order', async () => { ] `); }); + +it('stops an infinite stream when unsubscribed', async () => { + const event$ = Rx.of(1); + const initial = 0; + const reducer = jest.fn((prev, event, injectEvent) => { + // always inject a follow up event, making this infinite and synchronous + injectEvent(event + 1); + return prev + event; + }); + + const values = await summarizeEventStream(event$, initial, reducer) + .pipe(take(11), toArray()) + .toPromise(); + + expect(values).toMatchInlineSnapshot(` + Array [ + Object { + "state": 0, + }, + Object { + "event": 1, + "state": 1, + }, + Object { + "event": 2, + "state": 3, + }, + Object { + "event": 3, + "state": 6, + }, + Object { + "event": 4, + "state": 10, + }, + Object { + "event": 5, + "state": 15, + }, + Object { + "event": 6, + "state": 21, + }, + Object { + "event": 7, + "state": 28, + }, + Object { + "event": 8, + "state": 36, + }, + Object { + "event": 9, + "state": 45, + }, + Object { + "event": 10, + "state": 55, + }, + ] + `); + + // ensure reducer still only called 10 times after a timeout + expect(reducer).toHaveBeenCalledTimes(10); + await new Promise(resolve => setTimeout(resolve, 1000)); + expect(reducer).toHaveBeenCalledTimes(10); +}); From a3e159d7e7e0f5defb11829ff48ad771b4eb06dc Mon Sep 17 00:00:00 2001 From: spalger Date: Wed, 19 Feb 2020 07:28:44 -0700 Subject: [PATCH 3/3] shift naming a smidge --- .../src/common/event_stream_helpers.test.ts | 12 ++++++------ .../kbn-optimizer/src/common/event_stream_helpers.ts | 8 ++++---- .../optimizer/handle_optimizer_completion.test.ts | 2 +- .../src/optimizer/handle_optimizer_completion.ts | 2 +- packages/kbn-optimizer/src/optimizer/index.ts | 2 +- .../{optimizer_reducer.ts => optimizer_state.ts} | 2 +- packages/kbn-optimizer/src/run_optimizer.ts | 4 ++-- 7 files changed, 16 insertions(+), 16 deletions(-) rename packages/kbn-optimizer/src/optimizer/{optimizer_reducer.ts => optimizer_state.ts} (99%) diff --git a/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts b/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts index 51add0b24ccb2..f6f6841532799 100644 --- a/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts +++ b/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts @@ -22,7 +22,7 @@ import { toArray, take } from 'rxjs/operators'; import { summarizeEventStream } from './event_stream_helpers'; -it('emits each state with each event, ignoring events when reducer returns undefined', async () => { +it('emits each state with each event, ignoring events when summarizer returns undefined', async () => { const event$ = Rx.of(1, 2, 3, 4, 5); const initial = 0; const values = await summarizeEventStream(event$, initial, (state, event) => { @@ -173,13 +173,13 @@ it('interleaves mulitple injected events in order', async () => { it('stops an infinite stream when unsubscribed', async () => { const event$ = Rx.of(1); const initial = 0; - const reducer = jest.fn((prev, event, injectEvent) => { + const summarize = jest.fn((prev, event, injectEvent) => { // always inject a follow up event, making this infinite and synchronous injectEvent(event + 1); return prev + event; }); - const values = await summarizeEventStream(event$, initial, reducer) + const values = await summarizeEventStream(event$, initial, summarize) .pipe(take(11), toArray()) .toPromise(); @@ -231,8 +231,8 @@ it('stops an infinite stream when unsubscribed', async () => { ] `); - // ensure reducer still only called 10 times after a timeout - expect(reducer).toHaveBeenCalledTimes(10); + // ensure summarizer still only called 10 times after a timeout + expect(summarize).toHaveBeenCalledTimes(10); await new Promise(resolve => setTimeout(resolve, 1000)); - expect(reducer).toHaveBeenCalledTimes(10); + expect(summarize).toHaveBeenCalledTimes(10); }); diff --git a/packages/kbn-optimizer/src/common/event_stream_helpers.ts b/packages/kbn-optimizer/src/common/event_stream_helpers.ts index e36e9c77ebd13..d07af32f88897 100644 --- a/packages/kbn-optimizer/src/common/event_stream_helpers.ts +++ b/packages/kbn-optimizer/src/common/event_stream_helpers.ts @@ -24,11 +24,11 @@ export interface Update { state: State; } -export type Dispatch = (event: Event) => void; +export type EventInjector = (event: Event) => void; export type Summarizer = ( prev: State, event: Event, - injectEvent: Dispatch + injectEvent: EventInjector ) => State | undefined; /** @@ -38,7 +38,7 @@ export type Summarizer = ( export const summarizeEventStream = ( event$: Rx.Observable, initialState: State, - reducer: Summarizer + summarize: Summarizer ) => { return new Rx.Observable>(subscriber => { const eventBuffer: Event[] = []; @@ -59,7 +59,7 @@ export const summarizeEventStream = ( while (eventBuffer.length && !subscriber.closed) { const event = eventBuffer.shift()!; - const nextState = reducer(previousState, event, injectEvent); + const nextState = summarize(previousState, event, injectEvent); if (nextState === undefined) { // skip this event diff --git a/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.test.ts b/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.test.ts index 7a8575a6c91fe..3cc58e744a7b9 100644 --- a/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.test.ts +++ b/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.test.ts @@ -22,7 +22,7 @@ import { REPO_ROOT } from '@kbn/dev-utils'; import { Update } from '../common'; -import { OptimizerState } from './optimizer_reducer'; +import { OptimizerState } from './optimizer_state'; import { OptimizerConfig } from './optimizer_config'; import { handleOptimizerCompletion } from './handle_optimizer_completion'; import { toArray } from 'rxjs/operators'; diff --git a/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts b/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts index fe2fa320818a2..5dd500cd7a9e4 100644 --- a/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts +++ b/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts @@ -23,7 +23,7 @@ import { createFailError } from '@kbn/dev-utils'; import { pipeClosure, Update } from '../common'; -import { OptimizerState } from './optimizer_reducer'; +import { OptimizerState } from './optimizer_state'; import { OptimizerConfig } from './optimizer_config'; export function handleOptimizerCompletion(config: OptimizerConfig) { diff --git a/packages/kbn-optimizer/src/optimizer/index.ts b/packages/kbn-optimizer/src/optimizer/index.ts index 3df8ed9302668..84fd395e98976 100644 --- a/packages/kbn-optimizer/src/optimizer/index.ts +++ b/packages/kbn-optimizer/src/optimizer/index.ts @@ -19,7 +19,7 @@ export * from './optimizer_config'; export { WorkerStdio } from './observe_worker'; -export * from './optimizer_reducer'; +export * from './optimizer_state'; export * from './cache_keys'; export * from './watch_bundles_for_changes'; export * from './run_workers'; diff --git a/packages/kbn-optimizer/src/optimizer/optimizer_reducer.ts b/packages/kbn-optimizer/src/optimizer/optimizer_state.ts similarity index 99% rename from packages/kbn-optimizer/src/optimizer/optimizer_reducer.ts rename to packages/kbn-optimizer/src/optimizer/optimizer_state.ts index f5122461bff80..ac2a9b8ce1f8b 100644 --- a/packages/kbn-optimizer/src/optimizer/optimizer_reducer.ts +++ b/packages/kbn-optimizer/src/optimizer/optimizer_state.ts @@ -97,7 +97,7 @@ function getStatePhase(states: CompilerMsg[]) { throw new Error(`unable to summarize bundle states: ${JSON.stringify(states)}`); } -export function createOptimizerReducer( +export function createOptimizerStateSummarizer( config: OptimizerConfig ): Summarizer { return (state, event, injectEvent) => { diff --git a/packages/kbn-optimizer/src/run_optimizer.ts b/packages/kbn-optimizer/src/run_optimizer.ts index a4f14b187407f..ab12cc679bc62 100644 --- a/packages/kbn-optimizer/src/run_optimizer.ts +++ b/packages/kbn-optimizer/src/run_optimizer.ts @@ -31,7 +31,7 @@ import { watchBundlesForChanges$, runWorkers, OptimizerInitializedEvent, - createOptimizerReducer, + createOptimizerStateSummarizer, handleOptimizerCompletion, } from './optimizer'; @@ -76,7 +76,7 @@ export function runOptimizer(config: OptimizerConfig) { startTime, durSec: 0, }, - createOptimizerReducer(config) + createOptimizerStateSummarizer(config) ); }), handleOptimizerCompletion(config)