Skip to content

Commit

Permalink
Timer handlers (#2856)
Browse files Browse the repository at this point in the history
* Initial timer listener proof-of-concept

Adapts the `Timer.Sample` to make the callbacks on the `TimerRecordingListener` for `onStart`, `onError`, `onStop`. Being the initial pass, this tries to keep things as minimal as possible. Even the `onError` callback could probably have been initially omitted since we should work on making error tagging more of a first-class citizen in Micrometer. Then using an `onError` callback will be more natural.

* Add a test with a listener implementation that makes Brave Spans

Proves the listener with one implementation that makes spans from the timer recording.

* Leave some TODO comments to consider

* Runnable instrumentation test for TimerRecordingListenerTest

Tests that the trace context is correct when a runnable is executed on another thread.

* Add config to boot2-reactive sample using TimerRecordingListener

Uses the BraveTimerRecordingListener previously used in a unit test in the boot2-reactive sample application. The current Spring Boot instrumentation for WebFlux does not use the Timer start/stop methods, so the instrumentation code was copied and modified with the corresponding auto-configuration excluded. With the change to use start/stop, the listener is making spans as expected.

However, the trace context is not being propagated into or out of the process - trace headers passed to the application are not used, and trace headers are not sent on calls outside the application.

* Assert parent context inside runnable test

* Make TimerRecordingListener generic

* ADded tracing stuff

* Added cardinality and tracing contexts

* Hacking Timer and MeterRegistry to store samples

* Updated the context and setting of current samples

* Tags provider WIP

* Added simple test utility API

* Made context a tags provider

* Added support for TCKs for listeners

* Reusing micrometer-metrics/tracing + polish

* Listener -> Handler, Context -> HandlerContext

* Added First and All matching composite timer recording handlers

* Fixing build config so that we can resolve micrometer-tracing

* Introduce timeout for Jersey tests
There is some kind of locking/timing issue with the Jersey tests, this change introduces timeouts so the build will have a chance to fail. Without this the build never finishes.

* Fix'

* Guard against null currentSample

Instrumentation may switch threads without updating the current sample. This avoids the NPE in that situation and instead issues a warning log on first occurrence.

* Remove micrometer-tracing references

* Always sets a context

Co-authored-by: Tommy Ludwig <[email protected]>
Co-authored-by: Jonatan Ivanov <[email protected]>
  • Loading branch information
3 people authored Nov 15, 2021
1 parent f83cc85 commit 29c3400
Show file tree
Hide file tree
Showing 41 changed files with 2,557 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,26 @@
*/
package io.micrometer.core.aop;

import java.lang.reflect.Method;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Predicate;

import io.micrometer.core.annotation.Incubating;
import io.micrometer.core.annotation.Timed;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.lang.NonNullApi;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;

import java.lang.reflect.Method;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* <p>
* AspectJ aspect for intercepting types or methods annotated with {@link Timed @Timed}.<br>
Expand Down Expand Up @@ -206,8 +206,7 @@ private void record(ProceedingJoinPoint pjp, Timed timed, String metricName, Tim
.tags(EXCEPTION_TAG, exceptionClass)
.tags(tagsBasedOnJoinPoint.apply(pjp))
.publishPercentileHistogram(timed.histogram())
.publishPercentiles(timed.percentiles().length == 0 ? null : timed.percentiles())
.register(registry));
.publishPercentiles(timed.percentiles().length == 0 ? null : timed.percentiles()));
} catch (Exception e) {
// ignoring on purpose
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,18 @@
*/
package io.micrometer.core.instrument;

import io.micrometer.core.instrument.distribution.*;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.Histogram;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.NoopHistogram;
import io.micrometer.core.instrument.distribution.TimeWindowFixedBoundaryHistogram;
import io.micrometer.core.instrument.distribution.TimeWindowPercentileHistogram;
import io.micrometer.core.instrument.distribution.pause.ClockDriftPauseDetector;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.util.MeterEquivalence;
Expand All @@ -24,12 +35,6 @@
import org.LatencyUtils.SimplePauseDetector;
import org.LatencyUtils.TimeCappedMovingAverageIntervalEstimator;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public abstract class AbstractTimer extends AbstractMeter implements Timer {
private static Map<PauseDetector, org.LatencyUtils.PauseDetector> pauseDetectorCache =
new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
*/
package io.micrometer.core.instrument;

import java.time.Duration;
import java.util.Arrays;

import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.lang.Nullable;

import java.time.Duration;
import java.util.Arrays;

/**
* Base builder for {@link Timer}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package io.micrometer.core.instrument;

import io.micrometer.core.lang.Nullable;

import java.util.Objects;

import io.micrometer.core.lang.Nullable;

import static java.util.Objects.requireNonNull;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,23 @@
import io.micrometer.core.instrument.search.Search;
import io.micrometer.core.instrument.util.TimeUtils;
import io.micrometer.core.lang.Nullable;
import io.micrometer.core.util.internal.logging.InternalLogger;
import io.micrometer.core.util.internal.logging.InternalLoggerFactory;
import io.micrometer.core.util.internal.logging.WarnThenDebugLogger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -76,15 +82,23 @@
* @author Johnny Lim
*/
public abstract class MeterRegistry {

private static final InternalLogger LOG = InternalLoggerFactory.getInstance(MeterRegistry.class);
private static final WarnThenDebugLogger warnThenDebugLogger = new WarnThenDebugLogger(MeterRegistry.class);

protected final Clock clock;
private final Object meterMapLock = new Object();
private volatile MeterFilter[] filters = new MeterFilter[0];
private final List<TimerRecordingHandler<?>> timerRecordingHandlers = new CopyOnWriteArrayList<>();
private final List<Consumer<Meter>> meterAddedListeners = new CopyOnWriteArrayList<>();
private final List<Consumer<Meter>> meterRemovedListeners = new CopyOnWriteArrayList<>();
private final List<BiConsumer<Meter.Id, String>> meterRegistrationFailedListeners = new CopyOnWriteArrayList<>();
private final Config config = new Config();
private final More more = new More();

private final ThreadLocal<Timer.Sample> threadLocalRecordings = new ThreadLocal<>();
private final Deque<Timer.Sample> recordings = new LinkedBlockingDeque<>();

// Even though writes are guarded by meterMapLock, iterators across value space are supported
// Hence, we use CHM to support that iteration without ConcurrentModificationException risk
private final Map<Id, Meter> meterMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -114,6 +128,60 @@ protected MeterRegistry(Clock clock) {
this.clock = clock;
}

public void setCurrentSample(@Nullable Timer.Sample recording) {
Timer.Sample old = this.threadLocalRecordings.get();
if (old == recording) {
return;
}
if (old != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Putting current recording [" + old + "] on stack");
}
this.recordings.addFirst(old);
}
this.threadLocalRecordings.set(recording);
if (LOG.isTraceEnabled()) {
LOG.trace("Current recording is [" + this.threadLocalRecordings.get() + "]");
}
}

/**
* Returns the current interval recording.
*
* @return currently stored recording
*/
public Timer.Sample getCurrentSample() {
return this.threadLocalRecordings.get();
}

/**
* Removes the current span from thread local and brings back the previous span
* to the current thread local.
*/
public void removeCurrentSample(Timer.Sample sample) {
Timer.Sample current = this.threadLocalRecordings.get();
if (!sample.equals(current)) {
warnThenDebugLogger.log("Sample [" + sample + "] is not the same as the one currently in thread local [" + current + "]. This is caused by a mistake in the instrumentation.");
return;
}
this.threadLocalRecordings.remove();
if (this.recordings.isEmpty()) {
return;
}
try {
Timer.Sample first = this.recordings.removeFirst();
this.threadLocalRecordings.set(first);
if (LOG.isTraceEnabled()) {
LOG.trace("Took recording [" + current + "] from thread local and set previous one from stack [" + first + "] back as current");
}
} catch (NoSuchElementException ex) {
if (LOG.isTraceEnabled()) {
LOG.trace("Failed to remove a recording from the queue", ex);
}
}
}


/**
* Build a new gauge to be added to the registry. This is guaranteed to only be called if the gauge doesn't already exist.
*
Expand Down Expand Up @@ -799,6 +867,24 @@ public Config onMeterRegistrationFailed(BiConsumer<Id, String> meterRegistration
return this;
}

/**
* Register an event listener for {@link Timer} recordings made using {@link Timer#start(MeterRegistry)}
* and {@link io.micrometer.core.instrument.Timer.Sample#stop(Timer)} methods. You can add arbitrary behavior
* in the callbacks provided to get additional behavior out of timing instrumentation.
*
* @param listener listener to add to the current configuration
* @return This configuration instance
*/
public Config timerRecordingListener(TimerRecordingHandler<?> listener) {
timerRecordingHandlers.add(listener);
return this;
}

// package-private for minimal visibility
Collection<TimerRecordingHandler<?>> getTimerRecordingListeners() {
return timerRecordingHandlers;
}

/**
* Use the provided naming convention, overriding the default for your monitoring system.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 2017 VMware, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.core.instrument;

/**
* A provider of tags.
*
* @author Marcin Grzejszczak
*/
public interface TagsProvider {

default Tags getLowCardinalityTags() {
return Tags.empty();
}

default Tags getHighCardinalityTags() {
return Tags.empty();
}

default Tags getAllTags() {
return Tags.concat(getLowCardinalityTags(), getHighCardinalityTags());
}
}
Loading

0 comments on commit 29c3400

Please sign in to comment.