Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: batch observer to be independent from metric types #1709

Merged
merged 6 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/metrics/metrics/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const cpuUsageMetric = meter.createValueObserver('cpu_usage_per_app', {
description: 'Example of sync value observer used with async batch observer',
});

meter.createBatchObserver('metric_batch_observer', (observerBatchResult) => {
meter.createBatchObserver((observerBatchResult) => {
Promise.all([
someAsyncMetrics(),
// simulate waiting
Expand Down
7 changes: 2 additions & 5 deletions packages/opentelemetry-api/src/metrics/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
Counter,
ValueRecorder,
ValueObserver,
BatchObserver,
BatchMetricOptions,
UpDownCounter,
} from './Metric';
Expand Down Expand Up @@ -82,15 +81,13 @@ export interface Meter {
): ValueObserver;

/**
* Creates a new `BatchObserver` metric, can be used to update many metrics
* Creates a new `BatchObserver`, can be used to update many observer metrics
* at the same time and when operations needs to be async
* @param name the name of the metric.
* @param callback the batch observer callback
* @param [options] the metric batch options.
*/
createBatchObserver(
name: string,
callback: (batchObserverResult: BatchObserverResult) => void,
options?: BatchMetricOptions
): BatchObserver;
): void;
}
3 changes: 0 additions & 3 deletions packages/opentelemetry-api/src/metrics/Metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,6 @@ export type UpDownSumObserver = BaseObserver;
/** Base interface for the SumObserver metrics. */
export type SumObserver = BaseObserver;

/** Base interface for the Batch Observer metrics. */
export type BatchObserver = Metric;

/**
* key-value pairs passed by the user.
*/
Expand Down
12 changes: 2 additions & 10 deletions packages/opentelemetry-api/src/metrics/NoopMeter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {
Counter,
ValueRecorder,
ValueObserver,
BatchObserver,
UpDownCounter,
BaseObserver,
} from './Metric';
Expand Down Expand Up @@ -90,10 +89,9 @@ export class NoopMeter implements Meter {
* @param callback the batch observer callback
*/
createBatchObserver(
_name: string,
_callback: (batchObserverResult: BatchObserverResult) => void
): BatchObserver {
return NOOP_BATCH_OBSERVER_METRIC;
): void {
obecny marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}

Expand Down Expand Up @@ -158,10 +156,6 @@ export class NoopBaseObserverMetric
}
}

export class NoopBatchObserverMetric
extends NoopMetric<void>
implements BatchObserver {}

export class NoopBoundCounter implements BoundCounter {
add(_value: number): void {
return;
Expand Down Expand Up @@ -203,5 +197,3 @@ export const NOOP_UP_DOWN_SUM_OBSERVER_METRIC = new NoopBaseObserverMetric(
export const NOOP_SUM_OBSERVER_METRIC = new NoopBaseObserverMetric(
NOOP_BOUND_BASE_OBSERVER
);

export const NOOP_BATCH_OBSERVER_METRIC = new NoopBatchObserverMetric();
2 changes: 1 addition & 1 deletion packages/opentelemetry-metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ const MemUsageMetric = meter.createValueObserver('mem_usage_per_app', {
description: 'Memory',
});

meter.createBatchObserver('metric_batch_observer', (observerBatchResult) => {
meter.createBatchObserver((observerBatchResult) => {
getSomeAsyncMetrics().then(metrics => {
observerBatchResult.observe({ app: 'myApp' }, [
cpuUsageMetric.observation(metrics.value1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,32 @@
*/

import * as api from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { Logger, NoopLogger } from '@opentelemetry/api';
import { BatchObserverResult } from './BatchObserverResult';
import { BoundObserver } from './BoundInstrument';
import { Batcher } from './export/Batcher';
import { MetricKind, MetricRecord } from './export/types';
import { Metric } from './Metric';
import { MetricRecord } from './export/types';

const NOOP_CALLBACK = () => {};
const MAX_TIMEOUT_UPDATE_MS = 500;

/** This is a SDK implementation of Batch Observer Metric. */
export class BatchObserverMetric
extends Metric<BoundObserver>
implements api.BatchObserver {
export class BatchObserver {
private _callback: (observerResult: api.BatchObserverResult) => void;
private _maxTimeoutUpdateMS: number;
private _logger: Logger;

constructor(
name: string,
options: api.BatchMetricOptions,
private readonly _batcher: Batcher,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
callback?: (observerResult: api.BatchObserverResult) => void
) {
super(
name,
options,
MetricKind.BATCH_OBSERVER,
resource,
instrumentationLibrary
);
this._logger = options.logger ?? new NoopLogger();
this._maxTimeoutUpdateMS =
options.maxTimeoutUpdateMS ?? MAX_TIMEOUT_UPDATE_MS;
this._callback = callback || NOOP_CALLBACK;
}

protected _makeInstrument(labels: api.Labels): BoundObserver {
return new BoundObserver(
labels,
this._disabled,
this._valueType,
this._logger,
this._batcher.aggregatorFor(this._descriptor)
);
}

getMetricRecord(): Promise<MetricRecord[]> {
collect(): Promise<MetricRecord[]> {
this._logger.debug('getMetricRecord - start');
return new Promise((resolve, reject) => {
return new Promise(resolve => {
const observerResult = new BatchObserverResult();

// cancels after MAX_TIMEOUT_MS - no more waiting for results
Expand All @@ -74,14 +49,14 @@ export class BatchObserverMetric
// remove callback to prevent user from updating the values later if
// for any reason the observerBatchResult will be referenced
observerResult.onObserveCalled();
super.getMetricRecord().then(resolve, reject);
resolve();
this._logger.debug('getMetricRecord - timeout');
}, this._maxTimeoutUpdateMS);

// sets callback for each "observe" method
observerResult.onObserveCalled(() => {
clearTimeout(timer);
super.getMetricRecord().then(resolve, reject);
resolve();
this._logger.debug('getMetricRecord - end');
});

Expand Down
49 changes: 11 additions & 38 deletions packages/opentelemetry-metrics/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
import * as api from '@opentelemetry/api';
import { ConsoleLogger, InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { BatchObserverMetric } from './BatchObserverMetric';
import { BatchObserver } from './BatchObserver';
import { BaseBoundInstrument } from './BoundInstrument';
import { MetricKind } from './export/types';
import { UpDownCounterMetric } from './UpDownCounterMetric';
import { CounterMetric } from './CounterMetric';
import { UpDownSumObserverMetric } from './UpDownSumObserverMetric';
Expand All @@ -37,6 +36,7 @@ import { NoopExporter } from './export/NoopExporter';
*/
export class Meter implements api.Meter {
private readonly _logger: api.Logger;
private readonly _batchObservers: BatchObserver[] = [];
private readonly _metrics = new Map<string, Metric<BaseBoundInstrument>>();
private readonly _batcher: Batcher;
private readonly _resource: Resource;
Expand Down Expand Up @@ -258,35 +258,20 @@ export class Meter implements api.Meter {

/**
* Creates a new batch observer metric.
* @param name the name of the metric.
* @param callback the batch observer callback
* @param [options] the metric batch options.
*/
createBatchObserver(
name: string,
callback: (observerResult: api.BatchObserverResult) => void,
options: api.BatchMetricOptions = {}
obecny marked this conversation as resolved.
Show resolved Hide resolved
): api.BatchObserver {
if (!this._isValidName(name)) {
this._logger.warn(
`Invalid metric name ${name}. Defaulting to noop metric implementation.`
);
return api.NOOP_BATCH_OBSERVER_METRIC;
}
) {
obecny marked this conversation as resolved.
Show resolved Hide resolved
const opt: api.BatchMetricOptions = {
logger: this._logger,
...DEFAULT_METRIC_OPTIONS,
obecny marked this conversation as resolved.
Show resolved Hide resolved
...options,
};
const batchObserver = new BatchObserverMetric(
name,
opt,
this._batcher,
this._resource,
this._instrumentationLibrary,
callback
);
this._registerMetric(name, batchObserver);
const batchObserver = new BatchObserver(opt, callback);
this._batchObservers.push(batchObserver);
return batchObserver;
}

Expand All @@ -299,27 +284,15 @@ export class Meter implements api.Meter {
*/
async collect(): Promise<void> {
// call batch observers first
const batchObservers = Array.from(this._metrics.values())
.filter(metric => {
return metric.getKind() === MetricKind.BATCH_OBSERVER;
})
.map(metric => {
return metric.getMetricRecord();
});
await Promise.all(batchObservers).then(records => {
records.forEach(metrics => {
metrics.forEach(metric => this._batcher.process(metric));
});
const observations = this._batchObservers.map(observer => {
return observer.collect();
});
await Promise.all(observations);

// after this all remaining metrics can be run
const metrics = Array.from(this._metrics.values())
.filter(metric => {
return metric.getKind() !== MetricKind.BATCH_OBSERVER;
})
.map(metric => {
return metric.getMetricRecord();
});
const metrics = Array.from(this._metrics.values()).map(metric => {
return metric.getMetricRecord();
});

await Promise.all(metrics).then(records => {
records.forEach(metrics => {
Expand Down
Loading