Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pending traces report in tracer flares #8053

Merged
merged 28 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
39fd9dc
adding tracerflare.addtext
mhlidd Dec 4, 2024
6663a16
adding class to handle tracerdump
mhlidd Dec 5, 2024
0e10cda
updating flare to implement reporter and store only root spans
mhlidd Dec 6, 2024
6e7fc17
updating PR comments
mhlidd Dec 11, 2024
de9165b
rebasing
mhlidd Dec 16, 2024
23bc65a
changing getSpans header
mhlidd Dec 16, 2024
bf1763b
Adding DumpElement, DumpDrain, and DumpSupplier to extract PendingTraces
mhlidd Dec 17, 2024
907a0aa
addressing PR comments
mhlidd Dec 19, 2024
c76ade1
limiting number of tracers in tracer flare and sorting by oldest first
mhlidd Dec 20, 2024
5638d0c
removing unused log
mhlidd Dec 20, 2024
a0bd565
updating comparator
mhlidd Dec 20, 2024
0008ad1
saving changes
mhlidd Jan 9, 2025
b6b7c0d
initial implementation of test
mhlidd Jan 10, 2025
b3e9048
updating test
mhlidd Jan 13, 2025
713ccda
feat(core): Use prepare for flare to signal dump element
PerfectSlayer Jan 15, 2025
0ee90c1
feat(core): Update test prevent the span to be written early
PerfectSlayer Jan 15, 2025
bc71dcf
fix(core): Fix tests
PerfectSlayer Jan 16, 2025
60d08e7
fix(core): Reduce scope
PerfectSlayer Jan 21, 2025
60bb162
fix(core): Revert drain limit
PerfectSlayer Jan 21, 2025
3382b3d
feat(core): Refactor action elements
PerfectSlayer Jan 21, 2025
fbed357
adding support for json encoding of traces
mhlidd Jan 22, 2025
1dee7f3
cleanup
mhlidd Jan 22, 2025
af57f32
renaming file
mhlidd Jan 22, 2025
e9eac71
making TraceDumpJsonExporter a Writer
mhlidd Jan 23, 2025
9786fb5
nit changes
mhlidd Jan 28, 2025
3d0e3f7
updating test to match changes
mhlidd Jan 28, 2025
b443ddd
addressing PR comments
mhlidd Jan 29, 2025
3b1c41a
final unit tests changes
mhlidd Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dd-trace-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ excludedClassesCoverage += [
'datadog.trace.common.writer.RemoteMapper.NoopRemoteMapper',
'datadog.trace.core.monitor.DDAgentStatsDConnection',
'datadog.trace.core.monitor.LoggingStatsDClient',
'datadog.trace.core.PendingTraceBuffer.DelayingPendingTraceBuffer.FlushElement',
'datadog.trace.core.PendingTraceBuffer.DelayingPendingTraceBuffer.CommandElement',
'datadog.trace.core.StatusLogger',
// covered with CI Visibility smoke tests
'datadog.trace.core.StreamingTraceCollector',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package datadog.trace.common.writer;

import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.core.DDSpan;
import java.io.IOException;
import java.util.List;
import java.util.zip.ZipOutputStream;

public class TraceDumpJsonExporter implements Writer {

private StringBuilder dumpText;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Code Quality Violation

StringBuilder can lead to memory leaks in long lasting classes (...read more)

StringBuffers and StringBuilders have the potential to grow significantly, which could lead to memory leaks if they are retained within objects with extended lifetimes.

View in Datadog  Leave us feedback  Documentation

private ZipOutputStream zip;
private static final JsonAdapter<List<DDSpan>> TRACE_ADAPTER =
mhlidd marked this conversation as resolved.
Show resolved Hide resolved
new Moshi.Builder()
.add(DDSpanJsonAdapter.buildFactory(false))
.build()
.adapter(Types.newParameterizedType(List.class, DDSpan.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we don't use Collection.class here, which would then let us use public void write(final Collection<DDSpan> trace) { or if you want to keep it a bit more specific public void write(final Deque<DDSpan> trace) { and avoid the need to copy span data into a temporary ArrayList just to fit the expected interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially it was because public void write(final List<DDSpan> trace) has to be implemented regardless to override the writer interface that declares the method. I'm thinking I could override the method with a dummy implementation and overload a new public void write(final Deque<DDSpan> trace) { that has the actual implementation instead.


public TraceDumpJsonExporter(ZipOutputStream zip) {
this.zip = zip;
dumpText = new StringBuilder();
}

@Override
public void write(final List<DDSpan> trace) {
dumpText.append(TRACE_ADAPTER.toJson(trace));
dumpText.append('\n');
}

@Override
public void start() {
// do nothing
}

@Override
public boolean flush() {
try {
TracerFlare.addText(zip, "pending_traces.txt", dumpText.toString());
} catch (IOException e) {
// do nothing
}
return true;
}

@Override
public void close() {
// do nothing
}

@Override
public void incrementDropCounts(int spanCount) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import datadog.trace.core.CoreTracer.ConfigSnapshot;
import datadog.trace.core.monitor.HealthMetrics;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -448,4 +449,8 @@ public static long getDurationNano(CoreSpan<?> span) {
PendingTrace trace = (PendingTrace) traceCollector;
return trace.getLastWriteTime() - span.getStartTime();
}

Collection<DDSpan> getSpans() {
return spans;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@
import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_MONITOR;
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static java.util.Comparator.comparingLong;

import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.api.time.TimeSource;
import datadog.trace.common.writer.TraceDumpJsonExporter;
import datadog.trace.core.monitor.HealthMetrics;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.zip.ZipOutputStream;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
Expand Down Expand Up @@ -47,13 +56,16 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
private static final long FORCE_SEND_DELAY_MS = TimeUnit.SECONDS.toMillis(5);
private static final long SEND_DELAY_NS = TimeUnit.MILLISECONDS.toNanos(500);
private static final long SLEEP_TIME_MS = 100;
private static final CommandElement FLUSH_ELEMENT = new CommandElement();
private static final CommandElement DUMP_ELEMENT = new CommandElement();

private final MpscBlockingConsumerArrayQueue<Element> queue;
private final Thread worker;
private final TimeSource timeSource;

private volatile boolean closed = false;
private final AtomicInteger flushCounter = new AtomicInteger(0);
private final AtomicInteger dumpCounter = new AtomicInteger(0);

private final LongRunningTracesTracker runningTracesTracker;

Expand All @@ -78,6 +90,7 @@ public void enqueue(Element pendingTrace) {

@Override
public void start() {
TracerFlare.addReporter(new TracerDump(this));
worker.start();
}

Expand Down Expand Up @@ -108,10 +121,10 @@ public void flush() {
if (worker.isAlive()) {
int count = flushCounter.get();
int loop = 1;
boolean signaled = queue.offer(FlushElement.FLUSH_ELEMENT);
boolean signaled = queue.offer(FLUSH_ELEMENT);
while (!closed && !signaled) {
yieldOrSleep(loop++);
signaled = queue.offer(FlushElement.FLUSH_ELEMENT);
signaled = queue.offer(FLUSH_ELEMENT);
}
int newCount = flushCounter.get();
while (!closed && count >= newCount) {
Expand All @@ -130,9 +143,28 @@ public void accept(Element pendingTrace) {
}
}

private static final class FlushElement implements Element {
static FlushElement FLUSH_ELEMENT = new FlushElement();
private static final class DumpDrain
implements MessagePassingQueue.Consumer<Element>, MessagePassingQueue.Supplier<Element> {
private static final DumpDrain DUMP_DRAIN = new DumpDrain();
private static final List<Element> DATA = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make DATA an instance field?

You could then introduce a method called List<Element> collectTraces() which removes non-pending-trace elements, sorts it by oldest trace, then returns the data. You could also add a reset method to clear the recorded data, which avoids the need to expose the internal collection - although note that would not compact the underlying array.

You might want to consider making the data field volatile - the collectTraces method could then do the following:

  • store the current data reference in a local variable
  • assign a new ArrayList() to data - ready for the next dump request
  • return the local variable, i.e. the old ArrayList with the trace data

The main benefit this would bring is that there's a clear line between the writing and reading sides at the point we assign a new collection to data. It would also allow the data collection to be compacted, because we assign it a fresh ArrayList

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not fully sure I understand the point of making data volatile. From what I understand, it essentially allows us to "clear" the data array in collectTraces() by assigning it to a temp variable (whose memory will be cleared once the code exits the scope of the for-loop that calls the method, and instantiate a clean ArrayList for future traces to be stored until the next dump. Is this understanding correct?

private int index = 0;

@Override
public void accept(Element pendingTrace) {
DATA.add(pendingTrace);
}

@Override
public Element get() {
if (index < DATA.size()) {
return DATA.get(index++);
}
return null; // Should never reach here or else queue may break according to
// MessagePassingQueue docs
}
}

private static final class CommandElement implements Element {
@Override
public long oldestFinishedTime() {
return 0;
Expand Down Expand Up @@ -180,13 +212,20 @@ public void run() {
pendingTrace = queue.take(); // block until available;
}

if (pendingTrace instanceof FlushElement) {
if (pendingTrace == FLUSH_ELEMENT) {
// Since this is an MPSC queue, the drain needs to be called on the consumer thread
queue.drain(WriteDrain.WRITE_DRAIN);
flushCounter.incrementAndGet();
continue;
}

if (pendingTrace == DUMP_ELEMENT) {
queue.drain(DumpDrain.DUMP_DRAIN, 50);
mhlidd marked this conversation as resolved.
Show resolved Hide resolved
queue.fill(DumpDrain.DUMP_DRAIN, DumpDrain.DATA.size());
mhlidd marked this conversation as resolved.
Show resolved Hide resolved
dumpCounter.incrementAndGet();
continue;
}

// The element is no longer in the queue
pendingTrace.setEnqueued(false);

Expand All @@ -208,7 +247,7 @@ public void run() {
// Trace has been unmodified long enough, go ahead and write whatever is finished.
pendingTrace.write();
} else {
// Trace is too new. Requeue it and sleep to avoid a hot loop.
// Trace is too new. Requeue it and sleep to avoid a hot loop.
enqueue(pendingTrace);
Thread.sleep(SLEEP_TIME_MS);
}
Expand Down Expand Up @@ -277,4 +316,52 @@ public static PendingTraceBuffer discarding() {
public abstract void flush();

public abstract void enqueue(Element pendingTrace);

private static class TracerDump implements TracerFlare.Reporter {
private static final Comparator<Element> TRACE_BY_START_TIME =
comparingLong(trace -> trace.getRootSpan().getStartTime());
private static final Predicate<Element> NOT_PENDING_TRACE =
element -> !(element instanceof PendingTrace);
private final DelayingPendingTraceBuffer buffer;

private TracerDump(DelayingPendingTraceBuffer buffer) {
this.buffer = buffer;
}

@Override
public void prepareForFlare() {
if (buffer.worker.isAlive()) {
int count = buffer.dumpCounter.get();
int loop = 1;
boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
while (!buffer.closed && !signaled) {
buffer.yieldOrSleep(loop++);
signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
}
int newCount = buffer.dumpCounter.get();
while (!buffer.closed && count >= newCount) {
buffer.yieldOrSleep(loop++);
newCount = buffer.dumpCounter.get();
}
}
}

@Override
public void addReportToFlare(ZipOutputStream zip) throws IOException {
DelayingPendingTraceBuffer.DumpDrain.DATA.removeIf(NOT_PENDING_TRACE);
// Storing oldest traces first
DelayingPendingTraceBuffer.DumpDrain.DATA.sort((TRACE_BY_START_TIME).reversed());
mhlidd marked this conversation as resolved.
Show resolved Hide resolved

TraceDumpJsonExporter writer = new TraceDumpJsonExporter(zip);
for (Element e : DelayingPendingTraceBuffer.DumpDrain.DATA) {
if (e instanceof PendingTrace) {
PendingTrace trace = (PendingTrace) e;
writer.write(new ArrayList<>(trace.getSpans()));
mhlidd marked this conversation as resolved.
Show resolved Hide resolved
}
}
// Releasing memory used for ArrayList in drain
DelayingPendingTraceBuffer.DumpDrain.DATA.clear();
writer.flush();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class LongRunningTracesTrackerTest extends DDSpecification {

PendingTrace newTraceToTrack() {
PendingTrace trace = factory.create(DDTraceId.ONE)
PendingTraceBufferTest::newSpanOf(trace, PrioritySampling.SAMPLER_KEEP)
PendingTraceBufferTest::newSpanOf(trace, PrioritySampling.SAMPLER_KEEP, 0)
return trace
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import datadog.communication.monitor.Monitoring
import datadog.trace.SamplingPriorityMetadataChecker
import datadog.trace.api.DDSpanId
import datadog.trace.api.DDTraceId
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.api.flare.TracerFlare
import datadog.trace.api.time.SystemTimeSource
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.NoopPathwayContext
import datadog.trace.bootstrap.instrumentation.api.ScopeSource
Expand All @@ -20,8 +20,13 @@ import spock.util.concurrent.PollingConditions
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.zip.ZipInputStream
import java.util.zip.ZipOutputStream

import static datadog.trace.api.sampling.PrioritySampling.UNSET
import static datadog.trace.api.sampling.PrioritySampling.USER_KEEP
import static datadog.trace.core.PendingTraceBuffer.BUFFER_SIZE
import static java.nio.charset.StandardCharsets.UTF_8

@Timeout(5)
class PendingTraceBufferTest extends DDSpecification {
Expand Down Expand Up @@ -143,7 +148,7 @@ class PendingTraceBufferTest extends DDSpecification {

def "priority sampling is always sent"() {
setup:
def parent = addContinuation(newSpanOf(factory.create(DDTraceId.ONE), PrioritySampling.USER_KEEP))
def parent = addContinuation(newSpanOf(factory.create(DDTraceId.ONE), USER_KEEP, 0))
def metadataChecker = new SamplingPriorityMetadataChecker()

when: "Fill the buffer - Only children - Priority taken from root"
Expand Down Expand Up @@ -443,6 +448,36 @@ class PendingTraceBufferTest extends DDSpecification {
}
}

def "testing tracer flare dump"() {
setup:
TracerFlare.addReporter {} // exercises default methods
def dumpReporter = Mock(PendingTraceBuffer.TracerDump)
TracerFlare.addReporter(dumpReporter)
def trace = factory.create(DDTraceId.ONE)
def parent = newSpanOf(trace, UNSET, System.currentTimeMillis() * 1000)
def child = newSpanOf(parent)

when:
parent.finish()
buffer.start()
def entries = buildAndExtractZip()

then:
1 * dumpReporter.prepareForFlare()
1 * dumpReporter.addReportToFlare(_)
1 * dumpReporter.cleanupAfterFlare()
entries.size() == 1
(entries["pending_traces.txt"] as String).startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0') // Rest of dump is timestamp specific

then:
child.finish()

then:
trace.size() == 0
trace.pendingReferenceCount == 0
}


def addContinuation(DDSpan span) {
def scope = scopeManager.activate(span, ScopeSource.INSTRUMENTATION, true)
continuations << scope.capture()
Expand All @@ -451,10 +486,10 @@ class PendingTraceBufferTest extends DDSpecification {
}

static DDSpan newSpanOf(PendingTrace trace) {
return newSpanOf(trace, PrioritySampling.UNSET)
return newSpanOf(trace, UNSET, 0)
}

static DDSpan newSpanOf(PendingTrace trace, int samplingPriority) {
static DDSpan newSpanOf(PendingTrace trace, int samplingPriority, long timestampMicro) {
def context = new DDSpanContext(
trace.traceId,
1,
Expand All @@ -475,7 +510,7 @@ class PendingTraceBufferTest extends DDSpecification {
NoopPathwayContext.INSTANCE,
false,
PropagationTags.factory().empty())
return DDSpan.create("test", 0, context, null)
return DDSpan.create("test", timestampMicro, context, null)
}

static DDSpan newSpanOf(DDSpan parent) {
Expand All @@ -488,7 +523,7 @@ class PendingTraceBufferTest extends DDSpecification {
"fakeService",
"fakeOperation",
"fakeResource",
PrioritySampling.UNSET,
UNSET,
null,
Collections.emptyMap(),
false,
Expand All @@ -502,4 +537,27 @@ class PendingTraceBufferTest extends DDSpecification {
PropagationTags.factory().empty())
return DDSpan.create("test", 0, context, null)
}

def buildAndExtractZip() {
TracerFlare.prepareForFlare()
def out = new ByteArrayOutputStream()
try (ZipOutputStream zip = new ZipOutputStream(out)) {
TracerFlare.addReportsToFlare(zip)
} finally {
TracerFlare.cleanupAfterFlare()
}

def entries = [:]

def zip = new ZipInputStream(new ByteArrayInputStream(out.toByteArray()))
def entry
while (entry = zip.nextEntry) {
def bytes = new ByteArrayOutputStream()
bytes << zip
entries.put(entry.name, entry.name.endsWith(".bin")
? bytes.toByteArray() : new String(bytes.toByteArray(), UTF_8))
}

return entries
}
}
Loading