Skip to content

Commit

Permalink
feat(sdk-metrics-base): async instruments callback timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed May 9, 2022
1 parent 858f6ce commit 05b0e6e
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 50 deletions.
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to experimental packages in this project will be documented
### :rocket: (Enhancement)

* feat(exporters): update proto version and use otlp-transformer #2929 @pichlermarc
* feat(sdk-metrics-base): async instruments callback timeout #2742 @legendecas

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export type MetricData = SingularMetricData | HistogramMetricData;
export interface InstrumentationLibraryMetrics {
instrumentationLibrary: InstrumentationLibrary;
metrics: MetricData[];
errors: unknown[];
}

export interface ResourceMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

import { ResourceMetrics } from './MetricData';

export interface MetricCollectOptions {
timeoutMillis?: number;
}

/**
* This is a public interface that represent an export state of a MetricReader.
*/
export interface MetricProducer {
collect(): Promise<ResourceMetrics>;
collect(options?: MetricCollectOptions): Promise<ResourceMetrics>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ export abstract class MetricReader {
return undefined;
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
return await this._metricProducer.collect();
}

return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis);
return this._metricProducer.collect({
timeoutMillis: options?.timeoutMillis,
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import { HrTime } from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { MetricCollectOptions } from '..';
import { InstrumentationLibraryMetrics } from '../export/MetricData';
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
import { Meter } from '../Meter';
Expand Down Expand Up @@ -76,12 +77,12 @@ export class MeterSharedState {
* @param collectionTime the HrTime at which the collection was initiated.
* @returns the list of {@link MetricData} collected.
*/
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<InstrumentationLibraryMetrics> {
async collect(collector: MetricCollectorHandle, collectionTime: HrTime, options?: MetricCollectOptions): Promise<InstrumentationLibraryMetrics> {
/**
* 1. Call all observable callbacks first.
* 2. Collect metric result for the collector.
*/
await this._observableRegistry.observe();
const errors = await this._observableRegistry.observe(options?.timeoutMillis);
const metricDataList = Array.from(this._metricStorageRegistry.getStorages())
.map(metricStorage => {
return metricStorage.collect(
Expand All @@ -95,6 +96,7 @@ export class MeterSharedState {
return {
instrumentationLibrary: this._instrumentationLibrary,
metrics: metricDataList.filter(isNotNullish),
errors,
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import { hrTime } from '@opentelemetry/core';
import { AggregationTemporality } from '../export/AggregationTemporality';
import { ResourceMetrics } from '../export/MetricData';
import { MetricProducer } from '../export/MetricProducer';
import { MetricProducer, MetricCollectOptions } from '../export/MetricProducer';
import { MetricReader } from '../export/MetricReader';
import { ForceFlushOptions, ShutdownOptions } from '../types';
import { MeterProviderSharedState } from './MeterProviderSharedState';
Expand All @@ -33,10 +33,10 @@ export class MetricCollector implements MetricProducer {
this.aggregatorTemporality = this._metricReader.getPreferredAggregationTemporality();
}

async collect(): Promise<ResourceMetrics> {
async collect(options?: MetricCollectOptions): Promise<ResourceMetrics> {
const collectionTime = hrTime();
const meterCollectionPromises = Array.from(this._sharedState.meterSharedStates.values())
.map(meterSharedState => meterSharedState.collect(this, collectionTime));
.map(meterSharedState => meterSharedState.collect(this, collectionTime, options));
const instrumentationLibraryMetrics = await Promise.all(meterCollectionPromises);

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import { ObservableCallback } from '@opentelemetry/api-metrics';
import { ObservableResult } from '../ObservableResult';
import { callWithTimeout, PromiseAllSettled, PromiseAllSettledRejectionResult } from '../utils';
import { AsyncWritableMetricStorage } from './WritableMetricStorage';

/**
Expand All @@ -35,19 +36,26 @@ export class ObservableRegistry {
this._callbacks.push([callback, metricStorage]);
}

async observe(): Promise<void> {
/**
* @returns a promise of rejected reasons for invoking callbacks.
*/
async observe(timeoutMillis?: number): Promise<unknown[]> {
// TODO: batch observables
// https://github.com/open-telemetry/opentelemetry-specification/pull/2363
const promise = Promise.all(this._callbacks
const results = await PromiseAllSettled(this._callbacks
.map(async ([observableCallback, metricStorage]) => {
const observableResult = new ObservableResult();
// TODO: timeout with callback
// https://github.com/open-telemetry/opentelemetry-specification/issues/2295
await observableCallback(observableResult);
let callPromise: Promise<void> = Promise.resolve(observableCallback(observableResult));
if (timeoutMillis != null) {
callPromise = callWithTimeout(callPromise, timeoutMillis);
}
await callPromise;
metricStorage.record(observableResult.buffer);
})
);

await promise;
const rejections = results.filter((it): it is PromiseAllSettledRejectionResult => it.status === 'rejected')
.map(it => it.reason);
return rejections;
}
}
32 changes: 32 additions & 0 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,35 @@ export function callWithTimeout<T>(promise: Promise<T>, timeout: number): Promis
throw reason;
});
}

export interface PromiseAllSettledFulfillResult<T> {
status: 'fulfilled';
value: T;
}

export interface PromiseAllSettledRejectionResult {
status: 'rejected';
reason: unknown;
}

export type PromiseAllSettledResult<T> = PromiseAllSettledFulfillResult<T> | PromiseAllSettledRejectionResult;

/**
* Node.js v12.9 lower and browser compatible `Promise.allSettled`.
*/
export async function PromiseAllSettled<T>(promises: Promise<T>[]): Promise<PromiseAllSettledResult<T>[]> {
return Promise.all(promises.map<Promise<PromiseAllSettledResult<T>>>(async p => {
try {
const ret = await p;
return {
status: 'fulfilled',
value: ret,
};
} catch (e) {
return {
status: 'rejected',
reason: e,
};
}
}));
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ class TestDeltaMetricExporter extends TestMetricExporter {
const emptyResourceMetrics = { resource: defaultResource, instrumentationLibraryMetrics: [] };

class TestMetricProducer implements MetricProducer {
public collectionTime = 0;

async collect(): Promise<ResourceMetrics> {
await new Promise(resolve => setTimeout(resolve, this.collectionTime));
return { resource: defaultResource, instrumentationLibraryMetrics: [] };
}
}
Expand Down Expand Up @@ -395,21 +392,22 @@ describe('PeriodicExportingMetricReader', () => {
assert.deepStrictEqual(await reader.collect(), undefined);
});

it('should time out when timeoutMillis is set', async () => {
it('should call MetricProduce.collect with timeout', async () => {
const exporter = new TestMetricExporter();
const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: MAX_32_BIT_INT,
exportTimeoutMillis: 80,
});
const producer = new TestMetricProducer();
producer.collectionTime = 40;
reader.setMetricProducer(producer);

await assertRejects(
() => reader.collect({ timeoutMillis: 20 }),
TimeoutError
);
const collectStub = sinon.stub(producer, 'collect');

await reader.collect({ timeoutMillis: 20 });
assert(collectStub.calledOnce);
const args = collectStub.args[0];
assert.deepStrictEqual(args, [{ timeoutMillis: 20 }]);

await reader.shutdown();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import { DataPointType } from '../../src/export/MetricData';
import { MetricCollectorHandle } from '../../src/state/MetricCollector';
import { AsyncMetricStorage } from '../../src/state/AsyncMetricStorage';
import { NoopAttributesProcessor } from '../../src/view/AttributesProcessor';
import { assertDataPoint, assertMetricData, defaultInstrumentDescriptor } from '../util';
import { ObservableCallback } from '@opentelemetry/api-metrics';
import { ObservableRegistry } from '../../src/state/ObservableRegistry';
import { assertMetricData, assertDataPoint, defaultInstrumentDescriptor, ObservableCallbackDelegate } from '../util';

const deltaCollector: MetricCollectorHandle = {
aggregatorTemporality: AggregationTemporality.DELTA,
Expand All @@ -37,24 +36,6 @@ const cumulativeCollector: MetricCollectorHandle = {

const sdkStartTime = hrTime();

class ObservableCallbackDelegate {
private _delegate?: ObservableCallback;
private _callback: ObservableCallback;
constructor() {
this._callback = observableResult => {
this._delegate?.(observableResult);
};
}

setDelegate(delegate: ObservableCallback) {
this._delegate = delegate;
}

getCallback(): ObservableCallback {
return this._callback;
}
}

describe('AsyncMetricStorage', () => {
describe('collect', () => {
describe('Delta Collector', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

import * as assert from 'assert';
import * as sinon from 'sinon';
import { MeterProvider } from '../../src';
import { MeterProvider, TimeoutError } from '../../src';
import { DataPointType } from '../../src/export/MetricData';
import { PushMetricExporter } from '../../src/export/MetricExporter';
import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState';
import { MetricCollector } from '../../src/state/MetricCollector';
import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertDataPoint } from '../util';
import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertDataPoint, ObservableCallbackDelegate } from '../util';
import { TestMetricReader } from '../export/TestMetricReader';
import { TestDeltaMetricExporter, TestMetricExporter } from '../export/TestMetricExporter';

Expand Down Expand Up @@ -94,5 +94,85 @@ describe('MetricCollector', () => {
assert.strictEqual(metricData2.dataPoints.length, 1);
assertDataPoint(metricData2.dataPoints[0], {}, 3);
});

it('should collect observer metrics with timeout', async () => {
sinon.useFakeTimers();
/** preparing test instrumentations */
const exporter = new TestMetricExporter();
const { metricCollector, meter } = setupInstruments(exporter);

/** creating metric events */

/** observer1 is an abnormal observer */
const delegate1 = new ObservableCallbackDelegate();
meter.createObservableCounter('observer1', delegate1.getCallback());
delegate1.setDelegate(_observableResult => {
return new Promise(() => {
/** promise never settles */
});
});

/** observer2 is a normal observer */
const delegate2 = new ObservableCallbackDelegate();
meter.createObservableCounter('observer2', delegate2.getCallback());
delegate2.setDelegate(observableResult => {
observableResult.observe(1, {});
});

/** collect metrics */
{
const future = metricCollector.collect({
timeoutMillis: 100,
});
sinon.clock.tick(200);
const { instrumentationLibraryMetrics } = await future;
const { metrics, errors } = instrumentationLibraryMetrics[0];
assert.strictEqual(metrics.length, 2);
assert.strictEqual(errors.length, 1);
assert(errors[0] instanceof TimeoutError);

/** observer1 */
assertMetricData(metrics[0], DataPointType.SINGULAR, {
name: 'observer1'
});
assert.strictEqual(metrics[0].dataPoints.length, 0);

/** observer2 */
assertMetricData(metrics[1], DataPointType.SINGULAR, {
name: 'observer2'
});
assert.strictEqual(metrics[1].dataPoints.length, 1);
}

/** now the observer1 is back to normal */
delegate1.setDelegate(async observableResult => {
observableResult.observe(100, {});
});

/** collect metrics */
{
const future = metricCollector.collect({
timeoutMillis: 100,
});
sinon.clock.tick(100);
const { instrumentationLibraryMetrics } = await future;
const { metrics, errors } = instrumentationLibraryMetrics[0];
assert.strictEqual(metrics.length, 2);
assert.strictEqual(errors.length, 0);

/** observer1 */
assertMetricData(metrics[0], DataPointType.SINGULAR, {
name: 'observer1'
});
assert.strictEqual(metrics[0].dataPoints.length, 1);
assertDataPoint(metrics[0].dataPoints[0], {}, 100);

/** observer2 */
assertMetricData(metrics[1], DataPointType.SINGULAR, {
name: 'observer2'
});
assert.strictEqual(metrics[1].dataPoints.length, 1);
}
});
});
});
14 changes: 14 additions & 0 deletions experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { Measurement } from '../src/Measurement';
import { isNotNullish } from '../src/utils';
import { HrTime } from '@opentelemetry/api';
import { Histogram } from '../src/aggregator/types';
import { ObservableCallback } from '@opentelemetry/api-metrics';

export const defaultResource = new Resource({
resourceKey: 'my-resource',
Expand Down Expand Up @@ -129,3 +130,16 @@ export function assertPartialDeepStrictEqual<T>(actual: unknown, expected: T, me
assert.deepStrictEqual((actual as any)[ownName], (expected as any)[ownName], `${ownName} not equals: ${message ?? '<no-message>'}`);
}
}

export class ObservableCallbackDelegate {
private _delegate?: ObservableCallback;
setDelegate(delegate: ObservableCallback) {
this._delegate = delegate;
}

getCallback(): ObservableCallback {
return observableResult => {
return this._delegate?.(observableResult);
};
}
}
Loading

0 comments on commit 05b0e6e

Please sign in to comment.