Skip to content

Commit

Permalink
Merge branch 'main' into prom-example-touchup
Browse files Browse the repository at this point in the history
  • Loading branch information
vmarchaud authored Feb 22, 2022
2 parents a8dc6fd + 630a261 commit 896d878
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,34 @@

import { AggregationTemporality } from './AggregationTemporality';
import { MetricData } from './MetricData';
import {
ExportResult,
ExportResultCode,
} from '@opentelemetry/core';


// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricexporter

// TODO should this just be an interface and exporters can implement their own shutdown?
export abstract class MetricExporter {
protected _shutdown = false;
export interface PushMetricExporter {

abstract export(batch: MetricData[]): Promise<void>;
export(batch: MetricData[], resultCallback: (result: ExportResult) => void): void;

abstract forceFlush(): Promise<void>;
forceFlush(): Promise<void>;

abstract getPreferredAggregationTemporality(): AggregationTemporality;
getPreferredAggregationTemporality(): AggregationTemporality;

async shutdown(): Promise<void> {
if (this._shutdown) {
return;
}
shutdown(): Promise<void>;

// Setting _shutdown before flushing might prevent some exporters from flushing
// Waiting until flushing is complete might allow another flush to occur during shutdown
const flushPromise = this.forceFlush();
this._shutdown = true;
await flushPromise;
}

isShutdown() {
return this._shutdown;
}
}

export class ConsoleMetricExporter extends MetricExporter {
async export(_batch: MetricData[]) {
throw new Error('Method not implemented');
export class ConsoleMetricExporter implements PushMetricExporter {
protected _shutdown = true;

export(_batch: MetricData[], resultCallback: (result: ExportResult) => void) {
return resultCallback({
code: ExportResultCode.FAILED,
error: new Error('Method not implemented')
});
}

getPreferredAggregationTemporality() {
Expand All @@ -58,4 +52,8 @@ export class ConsoleMetricExporter extends MetricExporter {

// nothing to do
async forceFlush() {}

async shutdown() {
this._shutdown = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
*/

import * as api from '@opentelemetry/api';
import { ExportResultCode, globalErrorHandler } from '@opentelemetry/core';
import { MetricReader } from './MetricReader';
import { MetricExporter } from './MetricExporter';
import { PushMetricExporter } from './MetricExporter';
import { callWithTimeout, TimeoutError } from '../utils';

export type PeriodicExportingMetricReaderOptions = {
exporter: MetricExporter
exporter: PushMetricExporter
exportIntervalMillis?: number,
exportTimeoutMillis?: number
};
Expand All @@ -32,7 +33,7 @@ export type PeriodicExportingMetricReaderOptions = {
export class PeriodicExportingMetricReader extends MetricReader {
private _interval?: ReturnType<typeof setInterval>;

private _exporter: MetricExporter;
private _exporter: PushMetricExporter;

private readonly _exportInterval: number;

Expand Down Expand Up @@ -62,7 +63,20 @@ export class PeriodicExportingMetricReader extends MetricReader {

private async _runOnce(): Promise<void> {
const metrics = await this.collect({});
await this._exporter.export(metrics);
return new Promise((resolve, reject) => {
this._exporter.export(metrics, result => {
if (result.code !== ExportResultCode.SUCCESS) {
reject(
result.error ??
new Error(
`PeriodicExportingMetricReader: metrics export failed (error ${result.error})`
)
);
} else {
resolve();
}
});
});
}

protected override onInitialized(): void {
Expand All @@ -76,7 +90,7 @@ export class PeriodicExportingMetricReader extends MetricReader {
return;
}

api.diag.error('Unexpected error during export: %s', err);
globalErrorHandler(err);
}
}, this._exportInterval);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,45 @@

import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader';
import { AggregationTemporality } from '../../src/export/AggregationTemporality';
import { MetricExporter } from '../../src';
import { PushMetricExporter } from '../../src';
import { MetricData } from '../../src/export/MetricData';
import * as assert from 'assert';
import * as sinon from 'sinon';
import { MetricProducer } from '../../src/export/MetricProducer';
import { TimeoutError } from '../../src/utils';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import { assertRejects } from '../test-utils';

const MAX_32_BIT_INT = 2 ** 31 - 1;

class TestMetricExporter extends MetricExporter {
class TestMetricExporter implements PushMetricExporter {
public exportTime = 0;
public forceFlushTime = 0;
public throwException = false;
public failureResult = false;
private _batches: MetricData[][] = [];
private _shutdown: boolean = false;

async export(batch: MetricData[]): Promise<void> {
export(batch: MetricData[], resultCallback: (result: ExportResult) => void): void {
this._batches.push(batch);

if (this.throwException) {
throw new Error('Error during export');
}
await new Promise(resolve => setTimeout(resolve, this.exportTime));
setTimeout(() => {
if (this.failureResult) {
resultCallback({code: ExportResultCode.FAILED, error: new Error('some error') });
} else {
resultCallback({code: ExportResultCode.SUCCESS });
}
}, this.exportTime);
}

async shutdown(): Promise<void> {
if (this._shutdown) return;
const flushPromise = this.forceFlush();
this._shutdown = true;
await flushPromise;
}

async forceFlush(): Promise<void> {
Expand Down Expand Up @@ -176,6 +192,24 @@ describe('PeriodicExportingMetricReader', () => {
await reader.shutdown();
});

it('should keep running on export failure', async () => {
const exporter = new TestMetricExporter();
exporter.failureResult = true;
const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: 30,
exportTimeoutMillis: 20
});

reader.setMetricProducer(new TestMetricProducer());

const result = await exporter.waitForNumberOfExports(2);
assert.deepStrictEqual(result, [[], []]);

exporter.failureResult = false;
await reader.shutdown();
});

it('should keep exporting on export timeouts', async () => {
const exporter = new TestMetricExporter();
// set time longer than timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ import * as sinon from 'sinon';
import { MeterProvider } from '../../src';
import { AggregationTemporality } from '../../src/export/AggregationTemporality';
import { MetricData, PointDataType } from '../../src/export/MetricData';
import { MetricExporter } from '../../src/export/MetricExporter';
import { PushMetricExporter } from '../../src/export/MetricExporter';
import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState';
import { MetricCollector } from '../../src/state/MetricCollector';
import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertPointData } from '../util';
import { TestMetricReader } from '../export/TestMetricReader';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';

class TestMetricExporter extends MetricExporter {
class TestMetricExporter implements PushMetricExporter {
metricDataList: MetricData[] = [];
async export(batch: MetricData[]): Promise<void> {
async export(batch: MetricData[], resultCallback: (result: ExportResult) => void): Promise<void> {
this.metricDataList.push(...batch);
resultCallback({code: ExportResultCode.SUCCESS});
}

async shutdown(): Promise<void> {}

async forceFlush(): Promise<void> {}

getPreferredAggregationTemporality(): AggregationTemporality {
Expand Down Expand Up @@ -63,7 +67,8 @@ describe('MetricCollector', () => {
});

describe('collect', () => {
function setupInstruments(exporter: MetricExporter) {

function setupInstruments(exporter: PushMetricExporter) {
const meterProvider = new MeterProvider({ resource: defaultResource });

const reader = new TestMetricReader(exporter.getPreferredAggregationTemporality());
Expand Down

0 comments on commit 896d878

Please sign in to comment.