Skip to content

Commit

Permalink
feat(sdk-metrics-base): async instruments callback timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed May 9, 2022
1 parent 858f6ce commit 3e7b382
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export type MetricData = SingularMetricData | HistogramMetricData;
export interface InstrumentationLibraryMetrics {
instrumentationLibrary: InstrumentationLibrary;
metrics: MetricData[];
errors: unknown[];
}

export interface ResourceMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

import { ResourceMetrics } from './MetricData';

export interface MetricCollectOptions {
timeoutMillis?: number;
}

/**
* This is a public interface that represent an export state of a MetricReader.
*/
export interface MetricProducer {
collect(): Promise<ResourceMetrics>;
collect(options?: MetricCollectOptions): Promise<ResourceMetrics>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ export abstract class MetricReader {
return undefined;
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
return await this._metricProducer.collect();
}

return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis);
return this._metricProducer.collect({
timeoutMillis: options?.timeoutMillis,
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import { HrTime } from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { MetricCollectOptions } from '..';
import { InstrumentationLibraryMetrics } from '../export/MetricData';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { Meter } from '../Meter';
Expand Down Expand Up @@ -76,12 +77,13 @@ export class MeterSharedState {
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<InstrumentationLibraryMetrics> {
async collect(collector: MetricCollectorHandle, collectionTime: HrTime, options?: MetricCollectOptions): Promise<InstrumentationLibraryMetrics> {
const timeoutMillis = options?.timeoutMillis ?? Infinity;
/**
* 1. Call all observable callbacks first.
* 2. Collect metric result for the collector.
*/
await this._observableRegistry.observe();
const errors = await this._observableRegistry.observe(timeoutMillis);
const metricDataList = Array.from(this._metricStorageRegistry.getStorages())
.map(metricStorage => {
return metricStorage.collect(
Expand All @@ -95,6 +97,7 @@ export class MeterSharedState {
return {
instrumentationLibrary: this._instrumentationLibrary,
metrics: metricDataList.filter(isNotNullish),
errors,
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import { hrTime } from '@opentelemetry/core';
import { AggregationTemporality } from '../export/AggregationTemporality';
import { ResourceMetrics } from '../export/MetricData';
import { MetricProducer } from '../export/MetricProducer';
import { MetricProducer, MetricCollectOptions } from '../export/MetricProducer';
import { MetricReader } from '../export/MetricReader';
import { ForceFlushOptions, ShutdownOptions } from '../types';
import { MeterProviderSharedState } from './MeterProviderSharedState';
Expand All @@ -33,10 +33,10 @@ export class MetricCollector implements MetricProducer {
this.aggregatorTemporality = this._metricReader.getPreferredAggregationTemporality();
}

async collect(): Promise<ResourceMetrics> {
async collect(options?: MetricCollectOptions): Promise<ResourceMetrics> {
const collectionTime = hrTime();
const meterCollectionPromises = Array.from(this._sharedState.meterSharedStates.values())
.map(meterSharedState => meterSharedState.collect(this, collectionTime));
.map(meterSharedState => meterSharedState.collect(this, collectionTime, options));
const instrumentationLibraryMetrics = await Promise.all(meterCollectionPromises);

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import { ObservableCallback } from '@opentelemetry/api-metrics';
import { ObservableResult } from '../ObservableResult';
import { callWithTimeout, PromiseAllSettled, PromiseAllSettledRejectionResult } from '../utils';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';

/**
Expand All @@ -35,19 +36,26 @@ export class ObservableRegistry {
this._callbacks.push([callback, metricStorage]);
}

async observe(): Promise<void> {
/**
* @returns a promise of rejected reasons for invoking callbacks.
*/
async observe(timeoutMillis = Infinity): Promise<unknown[]> {
// TODO: batch observables
// https://github.com/open-telemetry/opentelemetry-specification/pull/2363
const promise = Promise.all(this._callbacks
const results = await PromiseAllSettled(this._callbacks
.map(async ([observableCallback, metricStorage]) => {
const observableResult = new ObservableResult();
// TODO: timeout with callback
// https://github.com/open-telemetry/opentelemetry-specification/issues/2295
await observableCallback(observableResult);
let callPromise: Promise<void> = Promise.resolve(observableCallback(observableResult));
if (timeoutMillis > 0 && timeoutMillis !== Infinity) {
callPromise = callWithTimeout(callPromise, timeoutMillis);
}
await callPromise;
metricStorage.record(observableResult.buffer);
})
);

await promise;
const rejections = results.filter((it): it is PromiseAllSettledRejectionResult => it.status === 'rejected')
.map(it => it.reason);
return rejections;
}
}
32 changes: 32 additions & 0 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,35 @@ export function callWithTimeout<T>(promise: Promise<T>, timeout: number): Promis
throw reason;
});
}

export interface PromiseAllSettledFulfillResult<T> {
status: 'fulfilled';
value: T;
}

export interface PromiseAllSettledRejectionResult {
status: 'rejected';
reason: unknown;
}

export type PromiseAllSettledResult<T> = PromiseAllSettledFulfillResult<T> | PromiseAllSettledRejectionResult;

/**
* Node.js v12.9 lower and browser compatible `Promise.allSettled`.
*/
export async function PromiseAllSettled<T>(promises: Promise<T>[]): Promise<PromiseAllSettledResult<T>[]> {
return Promise.all(promises.map<Promise<PromiseAllSettledResult<T>>>(async p => {
try {
const ret = await p;
return {
status: 'fulfilled',
value: ret,
};
} catch (e) {
return {
status: 'rejected',
reason: e,
};
}
}));
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ class TestDeltaMetricExporter extends TestMetricExporter {
const emptyResourceMetrics = { resource: defaultResource, instrumentationLibraryMetrics: [] };

class TestMetricProducer implements MetricProducer {
public collectionTime = 0;

async collect(): Promise<ResourceMetrics> {
await new Promise(resolve => setTimeout(resolve, this.collectionTime));
return { resource: defaultResource, instrumentationLibraryMetrics: [] };
}
}
Expand Down Expand Up @@ -395,21 +392,22 @@ describe('PeriodicExportingMetricReader', () => {
assert.deepStrictEqual(await reader.collect(), undefined);
});

it('should time out when timeoutMillis is set', async () => {
it('should call MetricProduce.collect with timeout', async () => {
const exporter = new TestMetricExporter();
const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: MAX_32_BIT_INT,
exportTimeoutMillis: 80,
});
const producer = new TestMetricProducer();
producer.collectionTime = 40;
reader.setMetricProducer(producer);

await assertRejects(
() => reader.collect({ timeoutMillis: 20 }),
TimeoutError
);
const collectStub = sinon.stub(producer, 'collect');

await reader.collect({ timeoutMillis: 20 });
assert(collectStub.calledOnce);
const args = collectStub.args[0];
assert.deepStrictEqual(args, [{ timeoutMillis: 20 }]);

await reader.shutdown();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import { DataPointType } from '../../src/export/MetricData';
import { MetricCollectorHandle } from '../../src/state/MetricCollector';
import { AsyncMetricStorage } from '../../src/state/AsyncMetricStorage';
import { NoopAttributesProcessor } from '../../src/view/AttributesProcessor';
import { assertDataPoint, assertMetricData, defaultInstrumentDescriptor } from '../util';
import { ObservableCallback } from '@opentelemetry/api-metrics';
import { ObservableRegistry } from '../../src/state/ObservableRegistry';
import { assertMetricData, assertDataPoint, defaultInstrumentDescriptor, ObservableCallbackDelegate } from '../util';

const deltaCollector: MetricCollectorHandle = {
aggregatorTemporality: AggregationTemporality.DELTA,
Expand All @@ -37,24 +36,6 @@ const cumulativeCollector: MetricCollectorHandle = {

const sdkStartTime = hrTime();

class ObservableCallbackDelegate {
private _delegate?: ObservableCallback;
private _callback: ObservableCallback;
constructor() {
this._callback = observableResult => {
this._delegate?.(observableResult);
};
}

setDelegate(delegate: ObservableCallback) {
this._delegate = delegate;
}

getCallback(): ObservableCallback {
return this._callback;
}
}

describe('AsyncMetricStorage', () => {
describe('collect', () => {
describe('Delta Collector', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

import * as assert from 'assert';
import * as sinon from 'sinon';
import { MeterProvider } from '../../src';
import { MeterProvider, TimeoutError } from '../../src';
import { DataPointType } from '../../src/export/MetricData';
import { PushMetricExporter } from '../../src/export/MetricExporter';
import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState';
import { MetricCollector } from '../../src/state/MetricCollector';
import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertDataPoint } from '../util';
import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertDataPoint, ObservableCallbackDelegate } from '../util';
import { TestMetricReader } from '../export/TestMetricReader';
import { TestDeltaMetricExporter, TestMetricExporter } from '../export/TestMetricExporter';

Expand Down Expand Up @@ -94,5 +94,64 @@ describe('MetricCollector', () => {
assert.strictEqual(metricData2.dataPoints.length, 1);
assertDataPoint(metricData2.dataPoints[0], {}, 3);
});

it('should collect with timeout', async () => {
sinon.useFakeTimers();
/** preparing test instrumentations */
const exporter = new TestMetricExporter();
const { metricCollector, meter } = setupInstruments(exporter);

/** creating metric events */
const delegate1 = new ObservableCallbackDelegate();
meter.createObservableCounter('counter1', delegate1.getCallback());
delegate1.setDelegate(_observableResult => {
return new Promise(() => {
/** promise never settles */
});
});

const delegate2 = new ObservableCallbackDelegate();
meter.createObservableCounter('counter2', delegate2.getCallback());
delegate2.setDelegate(observableResult => {
observableResult.observe(1, {});
});

/** collect metrics */
{
const future = metricCollector.collect({
timeoutMillis: 100,
});
sinon.clock.tick(200);
const { instrumentationLibraryMetrics } = await future;
const { metrics, errors } = instrumentationLibraryMetrics[0];
assert.strictEqual(metrics.length, 2);
assert.strictEqual(errors.length, 1);
assert(errors[0] instanceof TimeoutError);
}

/** now the counter1 observer is back to normal */
delegate1.setDelegate(async observableResult => {
observableResult.observe(2, {});
});

/** collect metrics */
{
const future = metricCollector.collect({
timeoutMillis: 100,
});
sinon.clock.tick(100);
const { instrumentationLibraryMetrics } = await future;
const { metrics, errors } = instrumentationLibraryMetrics[0];
assert.strictEqual(metrics.length, 2);
assert.strictEqual(errors.length, 0);

const metricData1 = metrics[0];
assertMetricData(metricData1, DataPointType.SINGULAR, {
name: 'counter1'
});
assert.strictEqual(metricData1.dataPoints.length, 1);
assertDataPoint(metricData1.dataPoints[0], {}, 2);
}
});
});
});
14 changes: 14 additions & 0 deletions experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { Measurement } from '../src/Measurement';
import { isNotNullish } from '../src/utils';
import { HrTime } from '@opentelemetry/api';
import { Histogram } from '../src/aggregator/types';
import { ObservableCallback } from '@opentelemetry/api-metrics';

export const defaultResource = new Resource({
resourceKey: 'my-resource',
Expand Down Expand Up @@ -129,3 +130,16 @@ export function assertPartialDeepStrictEqual<T>(actual: unknown, expected: T, me
assert.deepStrictEqual((actual as any)[ownName], (expected as any)[ownName], `${ownName} not equals: ${message ?? '<no-message>'}`);
}
}

export class ObservableCallbackDelegate {
private _delegate?: ObservableCallback;
setDelegate(delegate: ObservableCallback) {
this._delegate = delegate;
}

getCallback(): ObservableCallback {
return observableResult => {
return this._delegate?.(observableResult);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as sinon from 'sinon';
import { callWithTimeout, TimeoutError } from '../src/utils';
import { assertRejects } from './test-utils';

describe('utils', () => {
afterEach(() => {
sinon.restore();
});

describe('callWithTimeout', () => {
it('should reject if given promise not settled before timeout', async () => {
sinon.useFakeTimers();
const promise = new Promise(() => {
/** promise never settles */
});
assertRejects(callWithTimeout(promise, 100), TimeoutError);
});
});
});

0 comments on commit 3e7b382

Please sign in to comment.