From 6341a9bfa46fb1e7fbfd839db8cd325b64132243 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Tue, 3 Dec 2024 10:36:56 -0600 Subject: [PATCH 1/7] Tag dsm checkpoints with product mask --- .../datastreams/DefaultPathwayContext.java | 15 ++++++++++++- .../trace/core/datastreams/TagsProcessor.java | 1 + .../main/java/datadog/trace/api/Config.java | 22 +++++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index 1fec6b9852b..8cc2a179300 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -1,5 +1,6 @@ package datadog.trace.core.datastreams; +import static datadog.trace.core.datastreams.TagsProcessor.PRODUCTS_MASK; import static java.nio.charset.StandardCharsets.ISO_8859_1; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -52,6 +53,7 @@ public class DefaultPathwayContext implements PathwayContext { // direction != current direction private long closestOppositeDirectionHash; private String previousDirection; + private String productMaskTag; private static final Set hashableTagKeys = new HashSet( @@ -117,6 +119,15 @@ public void setCheckpoint( setCheckpoint(sortedTags, pointConsumer, defaultTimestamp, 0); } + private String getProductMaskTag() { + // it's fine to cache the value per context + if (productMaskTag == null) { + productMaskTag = PRODUCTS_MASK + ":" + Config.get().enabledProductsMask(); + } + + return productMaskTag; + } + @Override public void setCheckpoint( LinkedHashMap sortedTags, @@ -129,7 +140,9 @@ public void setCheckpoint( try { // So far, each tag key has only one tag value, so we're initializing the capacity to match // the number of tag keys for now. We should revisit this later if it's no longer the case. - List allTags = new ArrayList<>(sortedTags.size()); + List allTags = new ArrayList<>(sortedTags.size() + 1); + allTags.add(getProductMaskTag()); + PathwayHashBuilder pathwayHashBuilder = new PathwayHashBuilder(hashOfKnownTags, serviceNameOverride); DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java index 1838b47239b..640da22a299 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java @@ -21,6 +21,7 @@ public String apply(String key) { } public static final String MANUAL_TAG = "manual_checkpoint"; + public static final String PRODUCTS_MASK = "products_mask"; public static final String TYPE_TAG = "type"; private static final DDCache TYPE_TAG_CACHE = DDCaches.newFixedSizeCache(32); private static final Function TYPE_TAG_PREFIX = new StringPrefix("type:"); diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index a67d0c519bb..327324b6fca 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -3244,6 +3244,28 @@ public boolean isDataJobsEnabled() { return dataJobsEnabled; } + private static final int APM_PRODUCT = 1; // 00000001 + private static final int DSM_PRODUCT = 2; // 00000010 + private static final int DJM_PRODUCT = 4; // 00000100 + private static final int PROFILING_PRODUCT = 8; // 00001000 + + // enabledProductsMask can be extended as needed + public long enabledProductsMask() { + long enabledProducts = APM_PRODUCT; + + if (isDataStreamsEnabled()) { + enabledProducts |= DSM_PRODUCT; + } + if (isDataJobsEnabled()) { + enabledProducts |= DJM_PRODUCT; + } + if (isProfilingEnabled()) { + enabledProducts |= PROFILING_PRODUCT; + } + + return enabledProducts; + } + public String getDataJobsCommandPattern() { return dataJobsCommandPattern; } From ea77f9cd0fe70932f163f0bb38d8a8e14b8e4b33 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Tue, 3 Dec 2024 12:28:58 -0600 Subject: [PATCH 2/7] Fixed failing tests --- .../DefaultPathwayContextTest.groovy | 74 +++++++++---------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy index ecdce8e0651..cdece5e1414 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy @@ -74,8 +74,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { pointConsumer.points.size() == 2 verifyFirstPoint(pointConsumer.points[0]) with(pointConsumer.points[1]) { - edgeTags == ["group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == 25 @@ -97,8 +97,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { context.isStarted() pointConsumer.points.size() == 1 with(pointConsumer.points[0]) { - edgeTags == ["group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 4 hash != 0 payloadSizeBytes == 72 } @@ -124,16 +124,16 @@ class DefaultPathwayContextTest extends DDCoreSpecification { pointConsumer.points.size() == 3 verifyFirstPoint(pointConsumer.points[0]) with(pointConsumer.points[1]) { - edgeTags == ["direction:in", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["products_mask:1", "direction:in", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 5 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == 25 edgeLatencyNano == 25 } with(pointConsumer.points[2]) { - edgeTags == ["direction:in", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["products_mask:1", "direction:in", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 5 // this point should have the first point as parent, // as the loop protection will reset the parent if two identical // points (same hash for tag values) are about to form a hierarchy @@ -201,8 +201,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(27) @@ -221,8 +221,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { context.isStarted() pointConsumer.points.size() == 1 with(pointConsumer.points[0]) { - edgeTags == ["type:internal"] - edgeTags.size() == 1 + edgeTags == ["products_mask:1", "type:internal"] + edgeTags.size() == 2 parentHash == 0 hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(200) @@ -250,8 +250,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { decodedContext.isStarted() pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(26) @@ -269,8 +269,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { secondDecode.isStarted() pointConsumer.points.size() == 3 with(pointConsumer.points[2]) { - edgeTags == ["group:group", "topic:topicB", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topicB", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(58) @@ -300,8 +300,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { decodedContext.isStarted() pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(26) @@ -320,8 +320,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { secondDecode.isStarted() pointConsumer.points.size() == 3 with(pointConsumer.points[2]) { - edgeTags == ["group:group", "topic:topicB", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topicB", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(58) @@ -349,8 +349,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(27) @@ -378,8 +378,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { decodedContext.isStarted() pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(26) @@ -397,8 +397,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { secondDecode.isStarted() pointConsumer.points.size() == 3 with(pointConsumer.points[2]) { - edgeTags == ["group:group", "topic:topicB", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topicB", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(58) @@ -447,8 +447,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { decodedContext.isStarted() pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(26) @@ -467,8 +467,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { secondDecode.isStarted() pointConsumer.points.size() == 3 with(pointConsumer.points[2]) { - edgeTags == ["group:group", "topic:topicB", "type:kafka"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topicB", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(58) @@ -498,8 +498,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { decodedContext.isStarted() pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["topic:topic", "type:sqs"] - edgeTags.size() == 2 + edgeTags == ["products_mask:1", "topic:topic", "type:sqs"] + edgeTags.size() == 3 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(26) @@ -518,8 +518,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { secondDecode.isStarted() pointConsumer.points.size() == 3 with(pointConsumer.points[2]) { - edgeTags == ["topic:topicB", "type:sqs"] - edgeTags.size() == 2 + edgeTags == ["products_mask:1", "topic:topicB", "type:sqs"] + edgeTags.size() == 3 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(58) @@ -545,15 +545,15 @@ class DefaultPathwayContextTest extends DDCoreSpecification { pointConsumer.points.size() == 3 verifyFirstPoint(pointConsumer.points[0]) with(pointConsumer.points[1]) { - edgeTags == ["group:group", "topic:topic", "type:type"] - edgeTags.size() == 3 + edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:type"] + edgeTags.size() == 4 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == 25 edgeLatencyNano == 25 } with(pointConsumer.points[2]) { - edgeTags.size() == 0 + edgeTags.size() == 1 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == 50 From 6bb98df3933a91debc7c9aa9fad241517efb5f18 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Wed, 4 Dec 2024 09:30:59 -0600 Subject: [PATCH 3/7] Moved product mask logic from the config to DSM context --- .../datastreams/DefaultPathwayContext.java | 22 ++++++++++++++++--- .../main/java/datadog/trace/api/Config.java | 22 ------------------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index 8cc2a179300..aa57a6e659a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -119,10 +119,26 @@ public void setCheckpoint( setCheckpoint(sortedTags, pointConsumer, defaultTimestamp, 0); } - private String getProductMaskTag() { - // it's fine to cache the value per context + // extend the list as needed + private static final int APM_PRODUCT = 1; // 00000001 + private static final int DSM_PRODUCT = 2; // 00000010 + private static final int DJM_PRODUCT = 4; // 00000100 + private static final int PROFILING_PRODUCT = 8; // 00001000 + + public String getProductMaskTag() { if (productMaskTag == null) { - productMaskTag = PRODUCTS_MASK + ":" + Config.get().enabledProductsMask(); + long enabledProducts = APM_PRODUCT; + if (Config.get().isDataStreamsEnabled()) { + enabledProducts |= DSM_PRODUCT; + } + if (Config.get().isDataJobsEnabled()) { + enabledProducts |= DJM_PRODUCT; + } + if (Config.get().isProfilingEnabled()) { + enabledProducts |= PROFILING_PRODUCT; + } + + productMaskTag = PRODUCTS_MASK + ":" + enabledProducts; } return productMaskTag; diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index ea001cefe46..d0b3c7e8235 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -3244,28 +3244,6 @@ public boolean isDataJobsEnabled() { return dataJobsEnabled; } - private static final int APM_PRODUCT = 1; // 00000001 - private static final int DSM_PRODUCT = 2; // 00000010 - private static final int DJM_PRODUCT = 4; // 00000100 - private static final int PROFILING_PRODUCT = 8; // 00001000 - - // enabledProductsMask can be extended as needed - public long enabledProductsMask() { - long enabledProducts = APM_PRODUCT; - - if (isDataStreamsEnabled()) { - enabledProducts |= DSM_PRODUCT; - } - if (isDataJobsEnabled()) { - enabledProducts |= DJM_PRODUCT; - } - if (isProfilingEnabled()) { - enabledProducts |= PROFILING_PRODUCT; - } - - return enabledProducts; - } - public String getDataJobsCommandPattern() { return dataJobsCommandPattern; } From 408bde428d123462f11c81668032ad9b67f184f6 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Wed, 4 Dec 2024 10:05:16 -0600 Subject: [PATCH 4/7] Moved product mask to DSM payload level --- .../datastreams/DefaultPathwayContext.java | 30 +------- .../MsgPackDatastreamsPayloadWriter.java | 26 +++++++ .../trace/core/datastreams/TagsProcessor.java | 1 - .../datastreams/DataStreamsWritingTest.groovy | 3 + .../DefaultPathwayContextTest.groovy | 74 +++++++++---------- 5 files changed, 67 insertions(+), 67 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index aa57a6e659a..40bcddcb674 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -1,6 +1,5 @@ package datadog.trace.core.datastreams; -import static datadog.trace.core.datastreams.TagsProcessor.PRODUCTS_MASK; import static java.nio.charset.StandardCharsets.ISO_8859_1; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -119,31 +118,6 @@ public void setCheckpoint( setCheckpoint(sortedTags, pointConsumer, defaultTimestamp, 0); } - // extend the list as needed - private static final int APM_PRODUCT = 1; // 00000001 - private static final int DSM_PRODUCT = 2; // 00000010 - private static final int DJM_PRODUCT = 4; // 00000100 - private static final int PROFILING_PRODUCT = 8; // 00001000 - - public String getProductMaskTag() { - if (productMaskTag == null) { - long enabledProducts = APM_PRODUCT; - if (Config.get().isDataStreamsEnabled()) { - enabledProducts |= DSM_PRODUCT; - } - if (Config.get().isDataJobsEnabled()) { - enabledProducts |= DJM_PRODUCT; - } - if (Config.get().isProfilingEnabled()) { - enabledProducts |= PROFILING_PRODUCT; - } - - productMaskTag = PRODUCTS_MASK + ":" + enabledProducts; - } - - return productMaskTag; - } - @Override public void setCheckpoint( LinkedHashMap sortedTags, @@ -156,9 +130,7 @@ public void setCheckpoint( try { // So far, each tag key has only one tag value, so we're initializing the capacity to match // the number of tag keys for now. We should revisit this later if it's no longer the case. - List allTags = new ArrayList<>(sortedTags.size() + 1); - allTags.add(getProductMaskTag()); - + List allTags = new ArrayList<>(sortedTags.size()); PathwayHashBuilder pathwayHashBuilder = new PathwayHashBuilder(hashOfKnownTags, serviceNameOverride); DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index e6dc2a18e05..4038ec2e026 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -6,6 +6,7 @@ import datadog.communication.serialization.Writable; import datadog.communication.serialization.WritableFormatter; import datadog.communication.serialization.msgpack.MsgPackWriter; +import datadog.trace.api.Config; import datadog.trace.api.WellKnownTags; import datadog.trace.common.metrics.Sink; import java.util.Collection; @@ -31,6 +32,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private static final byte[] PARENT_HASH = "ParentHash".getBytes(ISO_8859_1); private static final byte[] BACKLOG_VALUE = "Value".getBytes(ISO_8859_1); private static final byte[] BACKLOG_TAGS = "Tags".getBytes(ISO_8859_1); + private static final byte[] PRODUCTS_MASK = "ProductMask".getBytes(ISO_8859_1); private static final int INITIAL_CAPACITY = 512 * 1024; @@ -55,6 +57,27 @@ public void reset() { buffer.reset(); } + // extend the list as needed + private static final int APM_PRODUCT = 1; // 00000001 + private static final int DSM_PRODUCT = 2; // 00000010 + private static final int DJM_PRODUCT = 4; // 00000100 + private static final int PROFILING_PRODUCT = 8; // 00001000 + + public long getProductsMask() { + long productsMask = APM_PRODUCT; + if (Config.get().isDataStreamsEnabled()) { + productsMask |= DSM_PRODUCT; + } + if (Config.get().isDataJobsEnabled()) { + productsMask |= DJM_PRODUCT; + } + if (Config.get().isProfilingEnabled()) { + productsMask |= PROFILING_PRODUCT; + } + + return productsMask; + } + @Override public void writePayload(Collection data, String serviceNameOverride) { writer.startMap(7); @@ -112,6 +135,9 @@ public void writePayload(Collection data, String serviceNameOverrid } } + writer.writeUTF8(PRODUCTS_MASK); + writer.writeLong(getProductsMask()); + buffer.mark(); sink.accept(buffer.messageCount(), buffer.slice()); buffer.reset(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java index 640da22a299..1838b47239b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java @@ -21,7 +21,6 @@ public String apply(String key) { } public static final String MANUAL_TAG = "manual_checkpoint"; - public static final String PRODUCTS_MASK = "products_mask"; public static final String TYPE_TAG = "type"; private static final DDCache TYPE_TAG_CACHE = DDCaches.newFixedSizeCache(32); private static final Function TYPE_TAG_PREFIX = new StringPrefix("type:"); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index 5f01afeafa7..767953ac9ed 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -262,6 +262,9 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackString() == (hash == 1 ? "topic:testTopic" : "topic:testTopic2") } + assert unpacker.unpackString() == "ProductMask" + assert unpacker.unpackLong() == 1 + return true } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy index cdece5e1414..ecdce8e0651 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy @@ -74,8 +74,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { pointConsumer.points.size() == 2 verifyFirstPoint(pointConsumer.points[0]) with(pointConsumer.points[1]) { - edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 3 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == 25 @@ -97,8 +97,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { context.isStarted() pointConsumer.points.size() == 1 with(pointConsumer.points[0]) { - edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 3 hash != 0 payloadSizeBytes == 72 } @@ -124,16 +124,16 @@ class DefaultPathwayContextTest extends DDCoreSpecification { pointConsumer.points.size() == 3 verifyFirstPoint(pointConsumer.points[0]) with(pointConsumer.points[1]) { - edgeTags == ["products_mask:1", "direction:in", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 5 + edgeTags == ["direction:in", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 4 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == 25 edgeLatencyNano == 25 } with(pointConsumer.points[2]) { - edgeTags == ["products_mask:1", "direction:in", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 5 + edgeTags == ["direction:in", "group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 4 // this point should have the first point as parent, // as the loop protection will reset the parent if two identical // points (same hash for tag values) are about to form a hierarchy @@ -201,8 +201,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 3 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(27) @@ -221,8 +221,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { context.isStarted() pointConsumer.points.size() == 1 with(pointConsumer.points[0]) { - edgeTags == ["products_mask:1", "type:internal"] - edgeTags.size() == 2 + edgeTags == ["type:internal"] + edgeTags.size() == 1 parentHash == 0 hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(200) @@ -250,8 +250,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { decodedContext.isStarted() pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 3 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(26) @@ -269,8 +269,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { secondDecode.isStarted() pointConsumer.points.size() == 3 with(pointConsumer.points[2]) { - edgeTags == ["products_mask:1", "group:group", "topic:topicB", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topicB", "type:kafka"] + edgeTags.size() == 3 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(58) @@ -300,8 +300,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { decodedContext.isStarted() pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 3 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(26) @@ -320,8 +320,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { secondDecode.isStarted() pointConsumer.points.size() == 3 with(pointConsumer.points[2]) { - edgeTags == ["products_mask:1", "group:group", "topic:topicB", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topicB", "type:kafka"] + edgeTags.size() == 3 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(58) @@ -349,8 +349,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 3 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(27) @@ -378,8 +378,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { decodedContext.isStarted() pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 3 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(26) @@ -397,8 +397,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { secondDecode.isStarted() pointConsumer.points.size() == 3 with(pointConsumer.points[2]) { - edgeTags == ["products_mask:1", "group:group", "topic:topicB", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topicB", "type:kafka"] + edgeTags.size() == 3 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(58) @@ -447,8 +447,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { decodedContext.isStarted() pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 3 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(26) @@ -467,8 +467,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { secondDecode.isStarted() pointConsumer.points.size() == 3 with(pointConsumer.points[2]) { - edgeTags == ["products_mask:1", "group:group", "topic:topicB", "type:kafka"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topicB", "type:kafka"] + edgeTags.size() == 3 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(58) @@ -498,8 +498,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { decodedContext.isStarted() pointConsumer.points.size() == 2 with(pointConsumer.points[1]) { - edgeTags == ["products_mask:1", "topic:topic", "type:sqs"] - edgeTags.size() == 3 + edgeTags == ["topic:topic", "type:sqs"] + edgeTags.size() == 2 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(26) @@ -518,8 +518,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification { secondDecode.isStarted() pointConsumer.points.size() == 3 with(pointConsumer.points[2]) { - edgeTags == ["products_mask:1", "topic:topicB", "type:sqs"] - edgeTags.size() == 3 + edgeTags == ["topic:topicB", "type:sqs"] + edgeTags.size() == 2 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == MILLISECONDS.toNanos(58) @@ -545,15 +545,15 @@ class DefaultPathwayContextTest extends DDCoreSpecification { pointConsumer.points.size() == 3 verifyFirstPoint(pointConsumer.points[0]) with(pointConsumer.points[1]) { - edgeTags == ["products_mask:1", "group:group", "topic:topic", "type:type"] - edgeTags.size() == 4 + edgeTags == ["group:group", "topic:topic", "type:type"] + edgeTags.size() == 3 parentHash == pointConsumer.points[0].hash hash != 0 pathwayLatencyNano == 25 edgeLatencyNano == 25 } with(pointConsumer.points[2]) { - edgeTags.size() == 1 + edgeTags.size() == 0 parentHash == pointConsumer.points[1].hash hash != 0 pathwayLatencyNano == 50 From 0d0c395452d1f16fa340c2e625c6debfe2ac0d36 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Wed, 4 Dec 2024 10:13:40 -0600 Subject: [PATCH 5/7] Cleanup --- .../datadog/trace/core/datastreams/DefaultPathwayContext.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index 40bcddcb674..1fec6b9852b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -52,7 +52,6 @@ public class DefaultPathwayContext implements PathwayContext { // direction != current direction private long closestOppositeDirectionHash; private String previousDirection; - private String productMaskTag; private static final Set hashableTagKeys = new HashSet( From 36168c35e7bf44141761d18297c88c3a9b870a72 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Wed, 4 Dec 2024 10:40:03 -0600 Subject: [PATCH 6/7] Fixed serialization --- .../core/datastreams/MsgPackDatastreamsPayloadWriter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 4038ec2e026..2baa8943de0 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -80,7 +80,7 @@ public long getProductsMask() { @Override public void writePayload(Collection data, String serviceNameOverride) { - writer.startMap(7); + writer.startMap(8); /* 1 */ writer.writeUTF8(ENV); writer.writeUTF8(wellKnownTags.getEnv()); @@ -135,6 +135,7 @@ public void writePayload(Collection data, String serviceNameOverrid } } + /* 8 */ writer.writeUTF8(PRODUCTS_MASK); writer.writeLong(getProductsMask()); From d0c8837ed706d25a41a15cd2604de51cdc76ece0 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko <21974069+kr-igor@users.noreply.github.com> Date: Wed, 4 Dec 2024 11:37:30 -0600 Subject: [PATCH 7/7] Fixed tests --- .../trace/core/datastreams/DataStreamsWritingTest.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index 767953ac9ed..6d163d0ac79 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -96,7 +96,7 @@ class DataStreamsWritingTest extends DDCoreSpecification { BufferedSource bufferedSource = Okio.buffer(gzipSource) MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bufferedSource.inputStream()) - assert unpacker.unpackMapHeader() == 7 + assert unpacker.unpackMapHeader() == 8 assert unpacker.unpackString() == "Env" assert unpacker.unpackString() == "test" assert unpacker.unpackString() == "Service" @@ -161,7 +161,7 @@ class DataStreamsWritingTest extends DDCoreSpecification { BufferedSource bufferedSource = Okio.buffer(gzipSource) MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bufferedSource.inputStream()) - assert unpacker.unpackMapHeader() == 7 + assert unpacker.unpackMapHeader() == 8 assert unpacker.unpackString() == "Env" assert unpacker.unpackString() == "test" assert unpacker.unpackString() == "Service"