Skip to content

Commit

Permalink
Add stopWithFlush option to ffwd http reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
lmuhlha committed Jan 8, 2021
1 parent cb3bfbe commit 5fef347
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,16 @@ public void start() {
return;
}

scheduledFuture = executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
FastForwardHttpReporter.this.report();
} catch (final Exception e) {
log.error("Error when trying to report metrics", e);
}
scheduledFuture = executorService.scheduleWithFixedDelay(() -> {
try {
FastForwardHttpReporter.this.report();
} catch (final Exception e) {
log.error("Error when trying to report metrics", e);
}
}, 0, duration, unit);
}


public void stop() {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
Expand All @@ -440,6 +438,21 @@ public void stop() {
}
}

public void stopWithFlush() {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
if (executorOwner) {
executorService.shutdown();
}
try {
log.info("Final flush of metrics.");
report();
} catch (final Exception e) {
log.error("Error during final flush of metrics: ", e);
}
}

@Override
public void close() {
stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import static com.google.common.collect.ImmutableMap.of;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import com.codahale.metrics.Gauge;
Expand All @@ -15,14 +14,12 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.ByteString;
import com.spotify.ffwd.http.model.v2.Batch;
import com.spotify.ffwd.http.HttpClient;
import com.spotify.ffwd.http.model.v2.Batch;
import com.spotify.ffwd.http.model.v2.Value;
import com.spotify.metrics.core.Distribution;
import com.spotify.metrics.core.MetricId;
import com.spotify.metrics.core.SemanticMetricBuilder;
import com.spotify.metrics.core.SemanticMetricDistribution;
import com.spotify.metrics.core.SemanticMetricFilter;
import com.spotify.metrics.core.SemanticMetricRegistry;
import com.spotify.metrics.ffwdhttp.Clock;
import com.spotify.metrics.ffwdhttp.FastForwardHttpReporter;
Expand All @@ -32,7 +29,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.jmock.lib.concurrent.DeterministicScheduler;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -66,7 +62,7 @@ public class FastForwardHttpReporterTest {
private SemanticMetricBuilder<SemanticMetricDistribution> semanticMetricBuilder;

private Map<String, String> commonTags;
DeterministicScheduler executorService;
private DeterministicScheduler executorService;

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -103,12 +99,7 @@ public void someReporting() {
registry.meter(MetricId.build("meter2"));
registry.timer(MetricId.build("timer"));
registry.register(MetricId.build("gauge").tagged("what", "some-gauge"),
new Gauge<Double>() {
@Override
public Double getValue() {
return 0D;
}
});
(Gauge<Double>) () -> 0D);

setupDistribution();

Expand Down Expand Up @@ -178,6 +169,9 @@ public Double getValue() {
final ArgumentCaptor<Batch> batch = ArgumentCaptor.forClass(Batch.class);

executorService.tick(REPORTING_PERIOD * 2 + 20, TimeUnit.MILLISECONDS);

reporter.stop();

verify(httpClient, atLeastOnce()).sendBatch(
batch.capture());
for (final Batch b : batch.getAllValues()) {
Expand Down Expand Up @@ -218,4 +212,131 @@ public void shouldAddExtractedTags() throws Exception {
final Map<String, String> commonTags = batch.getValue().getCommonTags();
assertEquals(ImmutableMap.of("foo", "bar", "bar", "baz"), commonTags);
}
}

@Test
public void someReportingNoTickWithFlush() {
doReturn(Observable.<Void>just(null)).when(httpClient).sendBatch(any(Batch.class));
fixedClock.setCurrentTime(TIME);
registry.counter(MetricId.build("counter"));
registry.derivingMeter(MetricId.build("deriving-meter"));
registry.histogram(MetricId.build("histogram"));
registry.meter(MetricId.build("meter").tagged("unit", "spec"));
registry.meter(MetricId.build("meter2"));
registry.timer(MetricId.build("timer"));
registry.register(MetricId.build("gauge").tagged("what", "some-gauge"),
(Gauge<Double>) () -> 0D);

setupDistribution();

final Set<Batch.Point> expected = new HashSet<>();
expected.add(new Batch.Point("prefix.distribution",
of("unit", "n", "stat", "distribution", "metric_type",
"distribution"), RESOURCE, Value.DistributionValue.create(VALUE_1), TIME));
expected.add(new Batch.Point("prefix.counter",
of("unit", "n", "stat", "count", "metric_type", "counter"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.deriving-meter",
of("unit", "n/s", "stat", "5m", "metric_type", "deriving-meter"), RESOURCE, VALUE_0,
TIME));
expected.add(new Batch.Point("prefix.deriving-meter",
of("unit", "n/s", "stat", "1m", "metric_type", "deriving-meter"), RESOURCE, VALUE_0,
TIME));
expected.add(new Batch.Point("prefix.histogram",
of("unit", "n", "stat", "max", "metric_type", "histogram"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.histogram",
of("unit", "n", "stat", "min", "metric_type", "histogram"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.histogram",
of("unit", "n", "stat", "mean", "metric_type", "histogram"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.histogram",
of("unit", "n", "stat", "p75", "metric_type", "histogram"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.histogram",
of("unit", "n", "stat", "median", "metric_type", "histogram"), RESOURCE, VALUE_0,
TIME));
expected.add(new Batch.Point("prefix.histogram",
of("unit", "n", "stat", "stddev", "metric_type", "histogram"), RESOURCE, VALUE_0,
TIME));
expected.add(new Batch.Point("prefix.histogram",
of("unit", "n", "stat", "p99", "metric_type", "histogram"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.meter",
of("unit", "spec", "stat", "count", "metric_type", "meter"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.meter",
of("unit", "spec/s", "stat", "1m", "metric_type", "meter"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.meter",
of("unit", "spec/s", "stat", "5m", "metric_type", "meter"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.meter2",
of("unit", "n", "stat", "count", "metric_type", "meter"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.meter2",
of("unit", "n/s", "stat", "1m", "metric_type", "meter"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.meter2",
of("unit", "n/s", "stat", "5m", "metric_type", "meter"), RESOURCE, VALUE_0, TIME));
expected.add(
new Batch.Point("prefix.timer", of("unit", "ns", "stat", "max", "metric_type", "timer"),
RESOURCE, VALUE_0, TIME));
expected.add(
new Batch.Point("prefix.timer", of("unit", "ns", "stat", "min", "metric_type", "timer"),
RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.timer",
of("unit", "ns", "stat", "mean", "metric_type", "timer"), RESOURCE, VALUE_0, TIME));
expected.add(
new Batch.Point("prefix.timer", of("unit", "ns", "stat", "p75", "metric_type", "timer"),
RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.timer",
of("unit", "ns", "stat", "median", "metric_type", "timer"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.timer",
of("unit", "ns", "stat", "stddev", "metric_type", "timer"), RESOURCE, VALUE_0, TIME));
expected.add(
new Batch.Point("prefix.timer", of("unit", "ns", "stat", "p99", "metric_type", "timer"),
RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.timer",
of("unit", "ns/s", "stat", "1m", "metric_type", "timer"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.timer",
of("unit", "ns/s", "stat", "5m", "metric_type", "timer"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.gauge",
of("what", "some-gauge", "unit", "n", "metric_type", "gauge"), RESOURCE, VALUE_0,
TIME));

reporter.start();

final ArgumentCaptor<Batch> batch = ArgumentCaptor.forClass(Batch.class);

reporter.stopWithFlush();

verify(httpClient, atLeastOnce()).sendBatch(
batch.capture());
for (final Batch b : batch.getAllValues()) {
assertEquals(commonTags, b.getCommonTags());
final Set<Batch.Point> points = new HashSet<>(b.getPoints());
points.removeAll(expected);
assertEquals("expected empty set of points", ImmutableSet.of(), points);
}
}

@Test
public void noReportingIfNoTick() {
doReturn(Observable.<Void>just(null)).when(httpClient).sendBatch(any(Batch.class));
fixedClock.setCurrentTime(TIME);
registry.counter(MetricId.build("counter"));
registry.derivingMeter(MetricId.build("deriving-meter"));

final Set<Batch.Point> expected = new HashSet<>();
expected.add(new Batch.Point("prefix.distribution",
of("unit", "n", "stat", "distribution", "metric_type",
"distribution"), RESOURCE, Value.DistributionValue.create(VALUE_1), TIME));
expected.add(new Batch.Point("prefix.counter",
of("unit", "n", "stat", "count", "metric_type", "counter"), RESOURCE, VALUE_0, TIME));
expected.add(new Batch.Point("prefix.deriving-meter",
of("unit", "n/s", "stat", "5m", "metric_type", "deriving-meter"), RESOURCE, VALUE_0,
TIME));
expected.add(new Batch.Point("prefix.deriving-meter",
of("unit", "n/s", "stat", "1m", "metric_type", "deriving-meter"), RESOURCE, VALUE_0,
TIME));

reporter.start();

final ArgumentCaptor<Batch> batch = ArgumentCaptor.forClass(Batch.class);

reporter.stop();

verify(httpClient, never()).sendBatch(
batch.capture());
}
}

0 comments on commit 5fef347

Please sign in to comment.