Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pending traces report in tracer flares #8053

Merged
merged 28 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
39fd9dc
adding tracerflare.addtext
mhlidd Dec 4, 2024
6663a16
adding class to handle tracerdump
mhlidd Dec 5, 2024
0e10cda
updating flare to implement reporter and store only root spans
mhlidd Dec 6, 2024
6e7fc17
updating PR comments
mhlidd Dec 11, 2024
de9165b
rebasing
mhlidd Dec 16, 2024
23bc65a
changing getSpans header
mhlidd Dec 16, 2024
bf1763b
Adding DumpElement, DumpDrain, and DumpSupplier to extract PendingTraces
mhlidd Dec 17, 2024
907a0aa
addressing PR comments
mhlidd Dec 19, 2024
c76ade1
limiting number of tracers in tracer flare and sorting by oldest first
mhlidd Dec 20, 2024
5638d0c
removing unused log
mhlidd Dec 20, 2024
a0bd565
updating comparator
mhlidd Dec 20, 2024
0008ad1
saving changes
mhlidd Jan 9, 2025
b6b7c0d
initial implementation of test
mhlidd Jan 10, 2025
b3e9048
updating test
mhlidd Jan 13, 2025
713ccda
feat(core): Use prepare for flare to signal dump element
PerfectSlayer Jan 15, 2025
0ee90c1
feat(core): Update test prevent the span to be written early
PerfectSlayer Jan 15, 2025
bc71dcf
fix(core): Fix tests
PerfectSlayer Jan 16, 2025
60d08e7
fix(core): Reduce scope
PerfectSlayer Jan 21, 2025
60bb162
fix(core): Revert drain limit
PerfectSlayer Jan 21, 2025
3382b3d
feat(core): Refactor action elements
PerfectSlayer Jan 21, 2025
fbed357
adding support for json encoding of traces
mhlidd Jan 22, 2025
1dee7f3
cleanup
mhlidd Jan 22, 2025
af57f32
renaming file
mhlidd Jan 22, 2025
e9eac71
making TraceDumpJsonExporter a Writer
mhlidd Jan 23, 2025
9786fb5
nit changes
mhlidd Jan 28, 2025
3d0e3f7
updating test to match changes
mhlidd Jan 28, 2025
b443ddd
addressing PR comments
mhlidd Jan 29, 2025
3b1c41a
final unit tests changes
mhlidd Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dd-trace-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ excludedClassesCoverage += [
'datadog.trace.common.writer.RemoteMapper.NoopRemoteMapper',
'datadog.trace.core.monitor.DDAgentStatsDConnection',
'datadog.trace.core.monitor.LoggingStatsDClient',
'datadog.trace.core.PendingTraceBuffer.DelayingPendingTraceBuffer.FlushElement',
'datadog.trace.core.PendingTraceBuffer.DelayingPendingTraceBuffer.CommandElement',
'datadog.trace.core.StatusLogger',
// covered with CI Visibility smoke tests
'datadog.trace.core.StreamingTraceCollector',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package datadog.trace.common.writer;

import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.core.DDSpan;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.zip.ZipOutputStream;

public class TraceDumpJsonExporter implements Writer {

private static final JsonAdapter<Collection<DDSpan>> TRACE_ADAPTER =
new Moshi.Builder()
.add(DDSpanJsonAdapter.buildFactory(false))
.build()
.adapter(Types.newParameterizedType(Collection.class, DDSpan.class));
private StringBuilder dumpText;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Code Quality Violation

StringBuilder can lead to memory leaks in long lasting classes (...read more)

StringBuffers and StringBuilders have the potential to grow significantly, which could lead to memory leaks if they are retained within objects with extended lifetimes.

View in Datadog  Leave us feedback  Documentation

private ZipOutputStream zip;

public TraceDumpJsonExporter(ZipOutputStream zip) {
this.zip = zip;
dumpText = new StringBuilder();
}

public void write(final Collection<DDSpan> trace) {
dumpText.append(TRACE_ADAPTER.toJson(trace));
dumpText.append('\n');
}

@Override
public void write(List<DDSpan> trace) {
// Do nothing
}

@Override
public void start() {
// do nothing
}

@Override
public boolean flush() {
try {
TracerFlare.addText(zip, "pending_traces.txt", dumpText.toString());
} catch (IOException e) {
// do nothing
}
return true;
}

@Override
public void close() {
// do nothing
}

@Override
public void incrementDropCounts(int spanCount) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import datadog.trace.core.CoreTracer.ConfigSnapshot;
import datadog.trace.core.monitor.HealthMetrics;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -448,4 +449,8 @@ public static long getDurationNano(CoreSpan<?> span) {
PendingTrace trace = (PendingTrace) traceCollector;
return trace.getLastWriteTime() - span.getStartTime();
}

Collection<DDSpan> getSpans() {
return spans;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@
import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_MONITOR;
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static java.util.Comparator.comparingLong;

import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.api.time.TimeSource;
import datadog.trace.common.writer.TraceDumpJsonExporter;
import datadog.trace.core.monitor.HealthMetrics;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.zip.ZipOutputStream;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
Expand Down Expand Up @@ -47,13 +56,16 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
private static final long FORCE_SEND_DELAY_MS = TimeUnit.SECONDS.toMillis(5);
private static final long SEND_DELAY_NS = TimeUnit.MILLISECONDS.toNanos(500);
private static final long SLEEP_TIME_MS = 100;
private static final CommandElement FLUSH_ELEMENT = new CommandElement();
private static final CommandElement DUMP_ELEMENT = new CommandElement();

private final MpscBlockingConsumerArrayQueue<Element> queue;
private final Thread worker;
private final TimeSource timeSource;

private volatile boolean closed = false;
private final AtomicInteger flushCounter = new AtomicInteger(0);
private final AtomicInteger dumpCounter = new AtomicInteger(0);

private final LongRunningTracesTracker runningTracesTracker;

Expand All @@ -78,6 +90,7 @@ public void enqueue(Element pendingTrace) {

@Override
public void start() {
TracerFlare.addReporter(new TracerDump(this));
worker.start();
}

Expand Down Expand Up @@ -108,10 +121,10 @@ public void flush() {
if (worker.isAlive()) {
int count = flushCounter.get();
int loop = 1;
boolean signaled = queue.offer(FlushElement.FLUSH_ELEMENT);
boolean signaled = queue.offer(FLUSH_ELEMENT);
while (!closed && !signaled) {
yieldOrSleep(loop++);
signaled = queue.offer(FlushElement.FLUSH_ELEMENT);
signaled = queue.offer(FLUSH_ELEMENT);
}
int newCount = flushCounter.get();
while (!closed && count >= newCount) {
Expand All @@ -130,9 +143,48 @@ public void accept(Element pendingTrace) {
}
}

private static final class FlushElement implements Element {
static FlushElement FLUSH_ELEMENT = new FlushElement();
private static final class DumpDrain
implements MessagePassingQueue.Consumer<Element>, MessagePassingQueue.Supplier<Element> {
private static final DumpDrain DUMP_DRAIN = new DumpDrain();
private static final int MAX_DUMPED_TRACES = 50;

private static final Comparator<Element> TRACE_BY_START_TIME =
comparingLong(trace -> trace.getRootSpan().getStartTime());
private static final Predicate<Element> NOT_PENDING_TRACE =
element -> !(element instanceof PendingTrace);

private volatile List<Element> DATA = new ArrayList<>();
mhlidd marked this conversation as resolved.
Show resolved Hide resolved
private int index = 0;

@Override
public void accept(Element pendingTrace) {
DATA.add(pendingTrace);
}

@Override
public Element get() {
if (index < DATA.size()) {
return DATA.get(index++);
}
return null; // Should never reach here or else queue may break according to
// MessagePassingQueue docs
}

public List<Element> collectTraces() {
DATA.removeIf(NOT_PENDING_TRACE);
// Storing oldest traces first
DATA.sort(TRACE_BY_START_TIME.reversed());
List<Element> orderedTraces = DATA;
DATA = new ArrayList<>();
return orderedTraces;
mhlidd marked this conversation as resolved.
Show resolved Hide resolved
}

public void clear() {
DATA.clear();
}
mhlidd marked this conversation as resolved.
Show resolved Hide resolved
}

private static final class CommandElement implements Element {
@Override
public long oldestFinishedTime() {
return 0;
Expand Down Expand Up @@ -180,13 +232,21 @@ public void run() {
pendingTrace = queue.take(); // block until available;
}

if (pendingTrace instanceof FlushElement) {
if (pendingTrace == FLUSH_ELEMENT) {
// Since this is an MPSC queue, the drain needs to be called on the consumer thread
queue.drain(WriteDrain.WRITE_DRAIN);
flushCounter.incrementAndGet();
continue;
}

if (pendingTrace == DUMP_ELEMENT) {
queue.fill(
DumpDrain.DUMP_DRAIN,
queue.drain(DumpDrain.DUMP_DRAIN, DumpDrain.MAX_DUMPED_TRACES));
dumpCounter.incrementAndGet();
continue;
}

// The element is no longer in the queue
pendingTrace.setEnqueued(false);

Expand All @@ -208,7 +268,7 @@ public void run() {
// Trace has been unmodified long enough, go ahead and write whatever is finished.
pendingTrace.write();
} else {
// Trace is too new. Requeue it and sleep to avoid a hot loop.
// Trace is too new. Requeue it and sleep to avoid a hot loop.
enqueue(pendingTrace);
Thread.sleep(SLEEP_TIME_MS);
}
Expand Down Expand Up @@ -277,4 +337,42 @@ public static PendingTraceBuffer discarding() {
public abstract void flush();

public abstract void enqueue(Element pendingTrace);

private static class TracerDump implements TracerFlare.Reporter {
private final DelayingPendingTraceBuffer buffer;

private TracerDump(DelayingPendingTraceBuffer buffer) {
this.buffer = buffer;
}

@Override
public void prepareForFlare() {
if (buffer.worker.isAlive()) {
int count = buffer.dumpCounter.get();
int loop = 1;
boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
while (!buffer.closed && !signaled) {
buffer.yieldOrSleep(loop++);
signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
}
int newCount = buffer.dumpCounter.get();
while (!buffer.closed && count >= newCount) {
buffer.yieldOrSleep(loop++);
newCount = buffer.dumpCounter.get();
}
}
}

@Override
public void addReportToFlare(ZipOutputStream zip) throws IOException {
TraceDumpJsonExporter writer = new TraceDumpJsonExporter(zip);
for (Element e : DelayingPendingTraceBuffer.DumpDrain.DUMP_DRAIN.collectTraces()) {
if (e instanceof PendingTrace) {
PendingTrace trace = (PendingTrace) e;
writer.write(trace.getSpans());
}
}
writer.flush();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class LongRunningTracesTrackerTest extends DDSpecification {

PendingTrace newTraceToTrack() {
PendingTrace trace = factory.create(DDTraceId.ONE)
PendingTraceBufferTest::newSpanOf(trace, PrioritySampling.SAMPLER_KEEP)
PendingTraceBufferTest::newSpanOf(trace, PrioritySampling.SAMPLER_KEEP, 0)
return trace
}

Expand Down
Loading
Loading