From abc64184541acb41d73ea3bea89f22dbe3f92d90 Mon Sep 17 00:00:00 2001 From: Quinn Slack Date: Fri, 16 Aug 2024 02:57:46 -1000 Subject: [PATCH] remove AsyncGenerator API, improve Observable API (#195) Async generators turned out to be too tricky to work with in the Cody repo, so just remove that experimental async generator API. And make the Observable API accept interop observables so we can use `observable-fns` instead of full-blown `rxjs`. --- client/vscode-lib/src/controller.test.ts | 4 - client/vscode-lib/src/controller.ts | 76 +++----- client/vscode-lib/src/util/errorReporter.ts | 40 ---- lib/client/src/client/client.test.ts | 157 ++++++++++------ lib/client/src/client/client.ts | 86 +-------- lib/client/src/client/util.test.ts | 192 -------------------- lib/client/src/client/util.ts | 99 ---------- 7 files changed, 124 insertions(+), 530 deletions(-) delete mode 100644 lib/client/src/client/util.test.ts delete mode 100644 lib/client/src/client/util.ts diff --git a/client/vscode-lib/src/controller.test.ts b/client/vscode-lib/src/controller.test.ts index 0ea82037..74d496fc 100644 --- a/client/vscode-lib/src/controller.test.ts +++ b/client/vscode-lib/src/controller.test.ts @@ -5,16 +5,12 @@ export function createMockController(): MockedObject { return { observeMeta: vi.fn(), meta: vi.fn(), - metaChanges__asyncGenerator: vi.fn(), observeMentions: vi.fn(), mentions: vi.fn(), - mentionsChanges__asyncGenerator: vi.fn(), observeItems: vi.fn(), items: vi.fn(), - itemsChanges__asyncGenerator: vi.fn(), observeAnnotations: vi.fn(), annotations: vi.fn(), - annotationsChanges__asyncGenerator: vi.fn(), } } diff --git a/client/vscode-lib/src/controller.ts b/client/vscode-lib/src/controller.ts index 84e42e22..44eceb13 100644 --- a/client/vscode-lib/src/controller.ts +++ b/client/vscode-lib/src/controller.ts @@ -7,7 +7,18 @@ import { createClient, } from '@openctx/client' import type { ImportedProviderConfiguration } from '@openctx/client/src/configuration.js' -import { type Observable, combineLatest, from, isObservable, map, mergeMap, of } from 'rxjs' +import { + type InteropObservable, + type Observable, + type ObservableInput, + combineLatest, + from, + isObservable, + map, + mergeMap, + of, +} from 'rxjs' +import { isInteropObservable } from 'rxjs/internal/util/isInteropObservable' import * as vscode from 'vscode' import { getClientConfiguration } from './configuration.js' import { initializeOpenCtxGlobal } from './global.js' @@ -26,15 +37,12 @@ type VSCodeClient = Client export interface Controller { observeMeta: VSCodeClient['metaChanges'] meta: VSCodeClient['meta'] - metaChanges__asyncGenerator: VSCodeClient['metaChanges__asyncGenerator'] observeMentions: VSCodeClient['mentionsChanges'] mentions: VSCodeClient['mentions'] - mentionsChanges__asyncGenerator: VSCodeClient['mentionsChanges__asyncGenerator'] observeItems: VSCodeClient['itemsChanges'] items: VSCodeClient['items'] - itemsChanges__asyncGenerator: VSCodeClient['itemsChanges__asyncGenerator'] observeAnnotations( doc: Pick, @@ -43,11 +51,6 @@ export interface Controller { doc: Pick, opts?: ProviderMethodOptions, ): ReturnType - annotationsChanges__asyncGenerator( - doc: Pick, - opts?: ProviderMethodOptions, - signal?: AbortSignal, - ): ReturnType } export function createController({ @@ -60,15 +63,15 @@ export function createController({ mergeConfiguration, preloadDelay, }: { - secrets: Observable | vscode.SecretStorage + secrets: + | Observable + | InteropObservable + | vscode.SecretStorage extensionId: string outputChannel: vscode.OutputChannel getAuthInfo?: (secrets: vscode.SecretStorage, providerUri: string) => Promise features: { annotations?: boolean; statusBar?: boolean } - providers?: - | ImportedProviderConfiguration[] - | Observable - | (() => AsyncGenerator) + providers?: ObservableInput mergeConfiguration?: (configuration: ClientConfiguration) => Promise preloadDelay?: number }): { @@ -82,9 +85,10 @@ export function createController({ const globalConfigurationChanges = observeWorkspaceConfigurationChanges('openctx') - const secrets: Observable = isObservable(secretsInput) - ? secretsInput - : of(secretsInput) + const secrets: Observable = + isObservable(secretsInput) || isInteropObservable(secretsInput) + ? from(secretsInput) + : of(secretsInput) // Watch for changes that could possibly affect configuration. This is overbroad because it does // not specify a config scope. @@ -155,10 +159,6 @@ export function createController({ UserAction.Implicit, client.annotationsChanges, ) - const clientAnnotationsChanges__asyncGenerator = errorReporter.wrapAsyncGenerator( - UserAction.Implicit, - client.annotationsChanges__asyncGenerator, - ) /** * The controller is passed to UI feature providers for them to fetch data. @@ -166,24 +166,12 @@ export function createController({ const controller: Controller = { meta: errorReporter.wrapPromise(UserAction.Explicit, client.meta), observeMeta: errorReporter.wrapObservable(UserAction.Explicit, client.metaChanges), - metaChanges__asyncGenerator: errorReporter.wrapAsyncGenerator( - UserAction.Explicit, - client.metaChanges__asyncGenerator, - ), mentions: errorReporter.wrapPromise(UserAction.Explicit, client.mentions), observeMentions: errorReporter.wrapObservable(UserAction.Explicit, client.mentionsChanges), - mentionsChanges__asyncGenerator: errorReporter.wrapAsyncGenerator( - UserAction.Explicit, - client.mentionsChanges__asyncGenerator, - ), items: errorReporter.wrapPromise(UserAction.Explicit, client.items), observeItems: errorReporter.wrapObservable(UserAction.Explicit, client.itemsChanges), - itemsChanges__asyncGenerator: errorReporter.wrapAsyncGenerator( - UserAction.Explicit, - client.itemsChanges__asyncGenerator, - ), async annotations(doc: vscode.TextDocument, opts?: ProviderMethodOptions) { if (ignoreDoc(doc)) { @@ -210,28 +198,6 @@ export function createController({ opts, ) }, - async *annotationsChanges__asyncGenerator( - doc: vscode.TextDocument, - opts?: ProviderMethodOptions, - signal?: AbortSignal, - ) { - if (ignoreDoc(doc)) { - return - } - - const g = clientAnnotationsChanges__asyncGenerator( - { - uri: doc.uri.toString(), - content: doc.getText(), - }, - opts, - signal, - ) - for await (const v of g) { - yield v - } - return - }, } if (features.annotations) { diff --git a/client/vscode-lib/src/util/errorReporter.ts b/client/vscode-lib/src/util/errorReporter.ts index 738ab448..a469f5c6 100644 --- a/client/vscode-lib/src/util/errorReporter.ts +++ b/client/vscode-lib/src/util/errorReporter.ts @@ -67,46 +67,6 @@ export class ErrorReporterController implements vscode.Disposable { } } - /** - * wraps providerMethod to ensure it reports errors to the user. - */ - public wrapAsyncGenerator( - userAction: UserAction, - providerMethod: ( - params: T, - opts?: ProviderMethodOptions, - signal?: AbortSignal, - ) => AsyncGenerator, - ) { - const getErrorReporter = this.getErrorReporter.bind(this) - return async function* ( - params: T, - opts?: ProviderMethodOptions, - signal?: AbortSignal, - ): AsyncGenerator { - const errorReporter = getErrorReporter(userAction, opts) - if (errorReporter.skip) { - return - } - - opts = withErrorHook(opts, (providerUri, error) => { - errorReporter.onError(providerUri, error) - errorReporter.report() - }) - - try { - for await (const value of providerMethod(params, opts, signal)) { - errorReporter.onValue(undefined) - errorReporter.report() - yield value - } - } catch (error) { - errorReporter.onError(undefined, error) - errorReporter.report() - } - } - } - /** * wraps providerMethod to ensure it reports errors to the user. */ diff --git a/lib/client/src/client/client.test.ts b/lib/client/src/client/client.test.ts index 44d55b65..e9c14539 100644 --- a/lib/client/src/client/client.test.ts +++ b/lib/client/src/client/client.test.ts @@ -1,6 +1,6 @@ import type { AnnotationsParams, ItemsParams, MetaResult } from '@openctx/protocol' import type { Item, Range } from '@openctx/schema' -import { Observable, firstValueFrom, of } from 'rxjs' +import { firstValueFrom, of } from 'rxjs' import { TestScheduler } from 'rxjs/testing' import { describe, expect, test } from 'vitest' import type { Annotation, EachWithProviderUri } from '../api.js' @@ -45,75 +45,114 @@ describe('Client', () => { new TestScheduler((actual, expected) => expect(actual).toStrictEqual(expected)) describe('meta', () => { - test('meta with async generator providers option', async () => { + test('promise', async () => { const client = createTestClient({ - configuration: () => of({}), - providers: async function* (): AsyncGenerator { - yield [ - { - provider: { meta: () => ({ name: 'my-provider-1' }) }, - providerUri: 'u1', - settings: true, + configuration: () => + Promise.resolve({ + enable: true, + providers: { + [testdataFileUri('simple.js')]: true, + }, + }), + }) + expect(await client.meta({}, {})).toStrictEqual>([ + { name: 'simple', providerUri: testdataFileUri('simple.js'), annotations: {} }, + ]) + }) + + test('simple', () => { + testScheduler().run(({ cold, expectObservable }) => { + expectObservable( + createTestClient({ + configuration: () => + cold('a', { + a: { + enable: true, + providers: { + [testdataFileUri('simple.js')]: true, + }, + }, + }), + __mock__: { + getProviderClient: () => ({ + meta: (_, settings) => of({ name: 'simple' }), + }), + }, + }).metaChanges({}, {}), + ).toBe('a', { + a: [{ name: 'simple', providerUri: testdataFileUri('simple.js') }], + } satisfies Record>) + }) + }) + + test('changes', () => { + testScheduler().run(({ cold, expectObservable }) => { + expectObservable( + createTestClient({ + authInfo: () => cold('a', { a: null }), + configuration: () => + cold('a-b-c', { + a: { enable: false }, + b: { + enable: true, + providers: { + [testdataFileUri('simpleMeta.js')]: { nameSuffix: '1' }, + }, + }, + c: { + enable: true, + providers: { + [testdataFileUri('simpleMeta.js')]: { nameSuffix: '2' }, + }, + }, + }), + providers: cold('a', { a: [] }), + __mock__: { + getProviderClient: () => ({ + meta: (_, settings) => + of({ name: `simpleMeta-${settings.nameSuffix}`, annotations: {} }), + }), }, - ] - await new Promise(resolve => setTimeout(resolve)) - yield [ + }).metaChanges({}, {}), + ).toBe('a-b-c', { + a: [], + b: [ { - provider: { meta: () => ({ name: 'my-provider-2' }) }, - providerUri: 'u2', - settings: true, + name: 'simpleMeta-1', + providerUri: testdataFileUri('simpleMeta.js'), + annotations: {}, }, + ], + c: [ { - provider: { meta: () => ({ name: 'my-provider-3' }) }, - providerUri: 'u3', - settings: true, + name: 'simpleMeta-2', + providerUri: testdataFileUri('simpleMeta.js'), + annotations: {}, }, - ] - }, + ], + } satisfies Record>) }) - - const values: EachWithProviderUri[] = [] - const signal = new AbortController().signal - for await (const value of client.metaChanges__asyncGenerator({}, {}, signal)) { - values.push(value) - } - expect(values).toStrictEqual([ - [{ name: 'my-provider-1', providerUri: 'u1' }], - [{ name: 'my-provider-2', providerUri: 'u2' }], - [ - { name: 'my-provider-2', providerUri: 'u2' }, - { name: 'my-provider-3', providerUri: 'u3' }, - ], - ]) }) - test('metaChanges__asyncGenerator', async () => { + test('providers option', async () => { const client = createTestClient({ - configuration: () => - new Observable(observer => { - observer.next({ enable: false, providers: {} }) - observer.next({ - enable: true, - providers: { [testdataFileUri('simpleMeta.js')]: { nameSuffix: '1' } }, - }) - setTimeout(() => { - observer.next({ - enable: true, - providers: { [testdataFileUri('simpleMeta.js')]: { nameSuffix: '2' } }, - }) - observer.complete() - }) - }), + configuration: () => of({ enable: true }), + providers: of([ + { + provider: { meta: () => ({ name: 'my-provider-2' }) }, + providerUri: 'u2', + settings: true, + }, + { + provider: { meta: () => ({ name: 'my-provider-3' }) }, + providerUri: 'u3', + settings: true, + }, + ]), }) - - const values: EachWithProviderUri[] = [] - const signal = new AbortController().signal - for await (const value of client.metaChanges__asyncGenerator({}, {}, signal)) { - values.push(value) - } - expect(values).toStrictEqual([ - [{ name: 'simpleMeta-1', providerUri: testdataFileUri('simpleMeta.js') }], - [{ name: 'simpleMeta-2', providerUri: testdataFileUri('simpleMeta.js') }], + expect(await client.meta({}, {})).toStrictEqual>([ + { name: 'my-provider-2', providerUri: 'u2' }, + { name: 'my-provider-3', providerUri: 'u3' }, ]) }) }) diff --git a/lib/client/src/client/client.ts b/lib/client/src/client/client.ts index 6c0f344f..1b224887 100644 --- a/lib/client/src/client/client.ts +++ b/lib/client/src/client/client.ts @@ -20,7 +20,6 @@ import { distinctUntilChanged, firstValueFrom, from, - isObservable, map, mergeMap, of, @@ -46,7 +45,6 @@ import { } from '../configuration.js' import type { Logger } from '../logger.js' import { type ProviderClient, createProviderClient } from '../providerClient/createProviderClient.js' -import { asyncGeneratorToObservable, observableToAsyncGenerator } from './util.js' /** * Hooks for the OpenCtx {@link Client} to access information about the environment, such as the @@ -76,10 +74,7 @@ export interface ClientEnv { /** * The list of providers already resolved and imported. */ - providers?: - | ImportedProviderConfiguration[] - | Observable - | (() => AsyncGenerator) + providers?: ObservableInput /** * The authentication info for the provider. @@ -180,20 +175,6 @@ export interface Client { opts?: ProviderMethodOptions, ): Observable> - /** - * Observe information about the configured providers using an async generator. - * - * The returned generator streams information as it is received from the providers and continues - * passing along any updates until {@link signal} is aborted. - * - * @internal - */ - metaChanges__asyncGenerator( - params: MetaParams, - opts?: ProviderMethodOptions, - signal?: AbortSignal, - ): AsyncGenerator> - /** * Get the mentions returned by the configured providers. * @@ -218,20 +199,6 @@ export interface Client { opts?: ProviderMethodOptions, ): Observable> - /** - * Observe OpenCtx mentions from the configured providers using an async generator. - * - * The returned generator streams mentions as they are received from the providers and continues - * passing along any updates until {@link signal} is aborted. - * - * @internal - */ - mentionsChanges__asyncGenerator( - params: MentionsParams, - opts?: ProviderMethodOptions, - signal?: AbortSignal, - ): AsyncGenerator> - /** * Get the items returned by the configured providers. * @@ -253,21 +220,6 @@ export interface Client { opts?: ProviderMethodOptions, ): Observable> - /** - * Observe OpenCtx items from the configured providers for the given resource using an async - * generator. - * - * The returned generator streams items as they are received from the providers and continues - * passing along any updates until {@link signal} is aborted. - * - * @internal - */ - itemsChanges__asyncGenerator( - params: ItemsParams, - opts?: ProviderMethodOptions, - signal?: AbortSignal, - ): AsyncGenerator> - /** * Get the annotations returned by the configured providers for the given resource. * @@ -292,21 +244,6 @@ export interface Client { opts?: ProviderMethodOptions, ): Observable[]>> - /** - * Observe OpenCtx annotations from the configured providers for the given resource using an - * async generator. - * - * The returned generator streams annotations as they are received from the providers and - * continues passing along any updates until {@link signal} is aborted. - * - * @internal - */ - annotationsChanges__asyncGenerator( - params: AnnotationsParams, - opts?: ProviderMethodOptions, - signal?: AbortSignal, - ): AsyncGenerator[]>> - /** * Dispose of the client and release all resources. */ @@ -341,12 +278,10 @@ export function createClient(env: ClientEnv): Client { } function providerClientsWithSettings(resource?: string): Observable { - const providers = isObservable(env.providers) - ? env.providers - : env.providers instanceof Function - ? asyncGeneratorToObservable(env.providers()) - : of(env.providers) - return combineLatest([env.configuration(resource), providers]) + return combineLatest([ + env.configuration(resource), + env.providers ? env.providers : of(undefined), + ]) .pipe( map(([config, providers]) => { if (!config.enable) { @@ -474,32 +409,21 @@ export function createClient(env: ClientEnv): Client { defaultValue: [], }), metaChanges: (params, opts) => metaChanges(params, { ...opts, emitPartial: true }), - metaChanges__asyncGenerator: (params, opts, signal) => - observableToAsyncGenerator(metaChanges(params, { ...opts, emitPartial: true }), signal), mentions: (params, opts) => firstValueFrom(mentionsChanges(params, { ...opts, emitPartial: false }), { defaultValue: [], }), mentionsChanges: (params, opts) => mentionsChanges(params, { ...opts, emitPartial: true }), - mentionsChanges__asyncGenerator: (params, opts, signal) => - observableToAsyncGenerator(mentionsChanges(params, { ...opts, emitPartial: true }), signal), items: (params, opts) => firstValueFrom(itemsChanges(params, { ...opts, emitPartial: false }), { defaultValue: [], }), itemsChanges: (params, opts) => itemsChanges(params, { ...opts, emitPartial: true }), - itemsChanges__asyncGenerator: (params, opts, signal) => - observableToAsyncGenerator(itemsChanges(params, { ...opts, emitPartial: true }), signal), annotations: (params, opts) => firstValueFrom(annotationsChanges(params, { ...opts, emitPartial: false }), { defaultValue: [], }), annotationsChanges: (params, opts) => annotationsChanges(params, { ...opts, emitPartial: true }), - annotationsChanges__asyncGenerator: (params, opts, signal) => - observableToAsyncGenerator( - annotationsChanges(params, { ...opts, emitPartial: true }), - signal, - ), dispose() { for (const sub of subscriptions) { sub.unsubscribe() diff --git a/lib/client/src/client/util.test.ts b/lib/client/src/client/util.test.ts deleted file mode 100644 index 2e7da29e..00000000 --- a/lib/client/src/client/util.test.ts +++ /dev/null @@ -1,192 +0,0 @@ -import { Observable } from 'rxjs' -import { describe, expect, test } from 'vitest' -import { asyncGeneratorToObservable, isAsyncGenerator, observableToAsyncGenerator } from './util.js' - -describe('observableToAsyncGenerator', () => { - test('observable that emits and completes', async () => { - const testObservable = new Observable(observer => { - observer.next(1) - observer.next(2) - observer.complete() - }) - - const results: number[] = [] - for await (const value of observableToAsyncGenerator(testObservable)) { - results.push(value) - } - expect(results).toEqual([1, 2]) - }) - - test('observable that immediately completes', async () => { - const testObservable = new Observable(observer => { - observer.complete() - }) - - const results: number[] = [] - for await (const value of observableToAsyncGenerator(testObservable)) { - results.push(value) - } - expect(results).toEqual([]) - }) - - test('observable that emits then errors', async () => { - const testObservable = new Observable(observer => { - observer.next(1) - observer.error('x') - }) - - const results: number[] = [] - let thrown: any - try { - for await (const value of observableToAsyncGenerator(testObservable)) { - results.push(value) - } - } catch (error) { - thrown = error - } - expect(results).toEqual([1]) - expect(thrown).toBe('x') - }) - - test('with an AbortSignal', async () => { - const INTERVAL = 10 - const testObservable = new Observable(observer => { - let i = 1 - const intervalHandle = setInterval(() => { - observer.next(i++) - }, INTERVAL) - return () => clearTimeout(intervalHandle) - }) - - const abortController = new AbortController() - const results: number[] = [] - for await (const value of observableToAsyncGenerator(testObservable, abortController.signal)) { - results.push(value) - if (value === 5) { - abortController.abort() - } - } - expect(results).toEqual([1, 2, 3, 4, 5]) - }) -}) -describe('asyncGeneratorToObservable', () => { - function readObservable(observable: Observable): Promise { - return new Promise(resolve => { - const results: number[] = [] - observable.subscribe({ - next: value => results.push(value), - complete: () => resolve(results), - }) - }) - } - - test('async generator that yields and completes', async () => { - const observable = asyncGeneratorToObservable( - (async function* () { - yield 1 - yield 2 - yield 3 - })(), - ) - expect(await readObservable(observable)).toEqual([1, 2, 3]) - }) - - test('async generator that throws an error', async () => { - const ERROR = new Error('Test error') - async function* generator() { - yield 1 - throw ERROR - } - - const observable = asyncGeneratorToObservable(generator()) - const results: number[] = [] - let error: Error | null = null - - await new Promise(resolve => { - observable.subscribe({ - next: value => results.push(value), - error: err => { - error = err - resolve() - }, - complete: () => resolve(), - }) - }) - - expect(results).toEqual([1]) - expect(error).toBe(ERROR) - }) - - test('async generator with no yields', async () => { - async function* generator() { - // Empty generator - } - - const observable = asyncGeneratorToObservable(generator()) - let completed = false - - await new Promise(resolve => { - observable.subscribe({ - next: () => expect.fail('should not yield any values'), - complete: () => { - completed = true - resolve() - }, - }) - }) - - expect(completed).toBe(true) - }) -}) - -describe('isAsyncGenerator', () => { - test('true for valid async generator', () => { - async function* validAsyncGenerator() { - yield 1 - } - expect(isAsyncGenerator(validAsyncGenerator())).toBe(true) - }) - - test('false for other values', () => { - expect(isAsyncGenerator(42)).toBe(false) - expect(isAsyncGenerator('string')).toBe(false) - expect(isAsyncGenerator(true)).toBe(false) - expect(isAsyncGenerator(undefined)).toBe(false) - expect(isAsyncGenerator(null)).toBe(false) - expect(isAsyncGenerator({})).toBe(false) - expect(isAsyncGenerator(function regularFunction() {})).toBe(false) - }) - - test('false for async functions', () => { - async function asyncFunction() {} - expect(isAsyncGenerator(asyncFunction)).toBe(false) - }) - - test('false for non-async generator functions', () => { - function* generatorFunction() { - yield 1 - } - expect(isAsyncGenerator(generatorFunction())).toBe(false) - }) - - test('false for objects with some but not all required methods', () => { - const incompleteObject = { - next: () => {}, - throw: () => {}, - [Symbol.asyncIterator]: function () { - return this - }, - } - expect(isAsyncGenerator(incompleteObject)).toBe(false) - }) - - test('false for objects with all methods but incorrect Symbol.asyncIterator implementation', () => { - const incorrectObject = { - next: () => {}, - throw: () => {}, - return: () => {}, - [Symbol.asyncIterator]: () => ({}), - } - expect(isAsyncGenerator(incorrectObject)).toBe(false) - }) -}) diff --git a/lib/client/src/client/util.ts b/lib/client/src/client/util.ts deleted file mode 100644 index e17fc7da..00000000 --- a/lib/client/src/client/util.ts +++ /dev/null @@ -1,99 +0,0 @@ -import { Observable } from 'rxjs' - -export async function* observableToAsyncGenerator( - observable: Observable, - signal?: AbortSignal, -): AsyncGenerator { - const queue: T[] = [] - let thrown: unknown - let resolve: (() => void) | undefined - let reject: ((error: unknown) => void) | undefined - let finished = false - - const subscription = observable.subscribe({ - next: value => { - queue.push(value) - resolve?.() - resolve = undefined - }, - error: error => { - thrown = error - reject?.(thrown) - reject = undefined - }, - complete: () => { - finished = true - resolve?.() - resolve = undefined - }, - }) - - let removeAbortListener: (() => void) | undefined = undefined - if (signal) { - const handler = () => { - resolve?.() - resolve = undefined - finished = true - } - signal.addEventListener('abort', handler) - removeAbortListener = () => { - signal.removeEventListener('abort', handler) - } - } - - try { - while (true) { - if (queue.length > 0) { - yield queue.shift()! - } else if (thrown) { - throw thrown - } else if (finished) { - break - } else { - await new Promise((res, rej) => { - resolve = res - reject = rej - }) - } - } - } finally { - subscription.unsubscribe() - removeAbortListener?.() - } -} - -export function asyncGeneratorToObservable(asyncGenerator: AsyncGenerator): Observable { - return new Observable(observer => { - ;(async () => { - try { - for await (const value of asyncGenerator) { - observer.next(value) - } - observer.complete() - } catch (error) { - observer.error(error) - } - })() - - return () => { - // If the AsyncGenerator has a return method, call it to clean up - if (asyncGenerator.return) { - asyncGenerator.return() - } - } - }) -} - -export function isAsyncGenerator(value: any): value is AsyncGenerator { - if (value === null || typeof value !== 'object') { - return false - } - - return ( - typeof value.next === 'function' && - typeof value.throw === 'function' && - typeof value.return === 'function' && - typeof value[Symbol.asyncIterator] === 'function' && - value[Symbol.asyncIterator]() === value - ) -}