Skip to content

Commit

Permalink
Merge branch 'main' into fix-81576
Browse files Browse the repository at this point in the history
  • Loading branch information
kibanamachine authored Nov 9, 2021
2 parents f40a506 + 06ab784 commit 771042e
Show file tree
Hide file tree
Showing 163 changed files with 3,220 additions and 3,021 deletions.
16 changes: 12 additions & 4 deletions src/plugins/discover/public/application/context/context_app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,29 @@ export const ContextApp = ({ indexPattern, anchorId }: ContextAppProps) => {
/**
* Context fetched state
*/
const { fetchedState, fetchContextRows, fetchAllRows, fetchSurroundingRows } = useContextAppFetch(
{
const { fetchedState, fetchContextRows, fetchAllRows, fetchSurroundingRows, resetFetchedState } =
useContextAppFetch({
anchorId,
indexPattern,
appState,
useNewFieldsApi,
services,
});
/**
* Reset state when anchor changes
*/
useEffect(() => {
if (prevAppState.current) {
prevAppState.current = undefined;
resetFetchedState();
}
);
}, [anchorId, resetFetchedState]);

/**
* Fetch docs on ui changes
*/
useEffect(() => {
if (!prevAppState.current || fetchedState.anchor._id !== anchorId) {
if (!prevAppState.current) {
fetchAllRows();
} else if (prevAppState.current.predecessorCount !== appState.predecessorCount) {
fetchSurroundingRows(SurrDocType.PREDECESSORS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,19 @@ export function useContextAppFetch({
[fetchSurroundingRows]
);

const fetchAllRows = useCallback(
() => fetchAnchorRow().then((anchor) => anchor && fetchContextRows(anchor)),
[fetchAnchorRow, fetchContextRows]
);
const fetchAllRows = useCallback(() => {
fetchAnchorRow().then((anchor) => anchor && fetchContextRows(anchor));
}, [fetchAnchorRow, fetchContextRows]);

const resetFetchedState = useCallback(() => {
setFetchedState(getInitialContextQueryState());
}, []);

return {
fetchedState,
fetchAllRows,
fetchContextRows,
fetchSurroundingRows,
resetFetchedState,
};
}
103 changes: 52 additions & 51 deletions src/plugins/expressions/common/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {
from,
isObservable,
of,
race,
throwError,
Observable,
ReplaySubject,
Expand All @@ -25,7 +24,7 @@ import { catchError, finalize, map, pluck, shareReplay, switchMap, tap } from 'r
import { Executor } from '../executor';
import { createExecutionContainer, ExecutionContainer } from './container';
import { createError } from '../util';
import { abortSignalToPromise, now } from '../../../kibana_utils/common';
import { now, AbortError } from '../../../kibana_utils/common';
import { Adapters } from '../../../inspector/common';
import { isExpressionValueError, ExpressionValueError } from '../expression_types/specs/error';
import {
Expand All @@ -50,13 +49,6 @@ type UnwrapReturnType<Function extends (...args: any[]) => unknown> =
? UnwrapObservable<ReturnType<Function>>
: UnwrapPromiseOrReturn<ReturnType<Function>>;

// type ArgumentsOf<Function extends ExpressionFunction> = Function extends ExpressionFunction<
// unknown,
// infer Arguments
// >
// ? Arguments
// : never;

/**
* The result returned after an expression function execution.
*/
Expand Down Expand Up @@ -95,6 +87,51 @@ const createAbortErrorValue = () =>
name: 'AbortError',
});

function markPartial<T>() {
return (source: Observable<T>) =>
new Observable<ExecutionResult<T>>((subscriber) => {
let latest: ExecutionResult<T> | undefined;

subscriber.add(
source.subscribe({
next: (result) => {
latest = { result, partial: true };
subscriber.next(latest);
},
error: (error) => subscriber.error(error),
complete: () => {
if (latest) {
latest.partial = false;
}

subscriber.complete();
},
})
);

subscriber.add(() => {
latest = undefined;
});
});
}

function takeUntilAborted<T>(signal: AbortSignal) {
return (source: Observable<T>) =>
new Observable<T>((subscriber) => {
const throwAbortError = () => {
subscriber.error(new AbortError());
};

subscriber.add(source.subscribe(subscriber));
subscriber.add(() => signal.removeEventListener('abort', throwAbortError));

signal.addEventListener('abort', throwAbortError);
if (signal.aborted) {
throwAbortError();
}
});
}

export interface ExecutionParams {
executor: Executor;
ast?: ExpressionAstExpression;
Expand Down Expand Up @@ -138,18 +175,6 @@ export class Execution<
*/
private readonly abortController = getNewAbortController();

/**
* Promise that rejects if/when abort controller sends "abort" signal.
*/
private readonly abortRejection = abortSignalToPromise(this.abortController.signal);

/**
* Races a given observable against the "abort" event of `abortController`.
*/
private race<T>(observable: Observable<T>): Observable<T> {
return race(from(this.abortRejection.promise), observable);
}

/**
* Whether .start() method has been called.
*/
Expand Down Expand Up @@ -221,32 +246,9 @@ export class Execution<

this.result = this.input$.pipe(
switchMap((input) =>
this.race(this.invokeChain<Output>(this.state.get().ast.chain, input)).pipe(
(source) =>
new Observable<ExecutionResult<Output>>((subscriber) => {
let latest: ExecutionResult<Output> | undefined;

subscriber.add(
source.subscribe({
next: (result) => {
latest = { result, partial: true };
subscriber.next(latest);
},
error: (error) => subscriber.error(error),
complete: () => {
if (latest) {
latest.partial = false;
}

subscriber.complete();
},
})
);

subscriber.add(() => {
latest = undefined;
});
})
this.invokeChain<Output>(this.state.get().ast.chain, input).pipe(
takeUntilAborted(this.abortController.signal),
markPartial()
)
),
catchError((error) => {
Expand All @@ -265,7 +267,6 @@ export class Execution<
},
error: (error) => this.state.transitions.setError(error),
}),
finalize(() => this.abortRejection.cleanup()),
shareReplay(1)
);
}
Expand Down Expand Up @@ -356,9 +357,9 @@ export class Execution<
// `resolveArgs` returns an object because the arguments themselves might
// actually have `then` or `subscribe` methods which would be treated as a `Promise`
// or an `Observable` accordingly.
return this.race(this.resolveArgs(fn, currentInput, fnArgs)).pipe(
return this.resolveArgs(fn, currentInput, fnArgs).pipe(
tap((args) => this.execution.params.debug && Object.assign(link.debug, { args })),
switchMap((args) => this.race(this.invokeFunction(fn, currentInput, args))),
switchMap((args) => this.invokeFunction(fn, currentInput, args)),
switchMap((output) => (getType(output) === 'error' ? throwError(output) : of(output))),
tap((output) => this.execution.params.debug && Object.assign(link.debug, { output })),
catchError((rawError) => {
Expand Down Expand Up @@ -390,7 +391,7 @@ export class Execution<
): Observable<UnwrapReturnType<Fn['fn']>> {
return of(input).pipe(
map((currentInput) => this.cast(currentInput, fn.inputTypes)),
switchMap((normalizedInput) => this.race(of(fn.fn(normalizedInput, args, this.context)))),
switchMap((normalizedInput) => of(fn.fn(normalizedInput, args, this.context))),
switchMap(
(fnResult) =>
(isObservable(fnResult)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import React from 'react';
import { EuiLoadingChart } from '@elastic/eui';

export const TimeseriesLoading = () => (
<div className="visChart__spinner">
<EuiLoadingChart mono size="l" />
</div>
);
Loading

0 comments on commit 771042e

Please sign in to comment.