Skip to content

Commit

Permalink
Canonicalize processor names and types in IngestStats (elastic#122610)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo authored Feb 14, 2025
1 parent 9141335 commit d59a0d9
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 11 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/122610.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122610
summary: Canonicalize processor names and types in `IngestStats`
area: Ingest Node
type: bug
issues: []
35 changes: 25 additions & 10 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1195,20 +1195,35 @@ static String getProcessorName(Processor processor) {
if (processor instanceof ConditionalProcessor conditionalProcessor) {
processor = conditionalProcessor.getInnerProcessor();
}
StringBuilder sb = new StringBuilder(5);
sb.append(processor.getType());

String tag = processor.getTag();
if (tag != null && tag.isEmpty()) {
tag = null; // it simplifies the rest of the logic slightly to coalesce to null
}

String pipelineName = null;
if (processor instanceof PipelineProcessor pipelineProcessor) {
String pipelineName = pipelineProcessor.getPipelineTemplate().newInstance(Map.of()).execute();
sb.append(":");
sb.append(pipelineName);
pipelineName = pipelineProcessor.getPipelineTemplate().newInstance(Map.of()).execute();
}
String tag = processor.getTag();
if (tag != null && tag.isEmpty() == false) {
sb.append(":");
sb.append(tag);

// if there's a tag, OR if it's a pipeline processor, then the processor name is a compound thing,
// BUT if neither of those apply, then it's just the type -- so we can return the type itself without
// allocating a new String object
if (tag == null && pipelineName == null) {
return processor.getType();
} else {
StringBuilder sb = new StringBuilder(5);
sb.append(processor.getType());
if (pipelineName != null) {
sb.append(":");
sb.append(pipelineName);
}
if (tag != null) {
sb.append(":");
sb.append(tag);
}
return sb.toString();
}
return sb.toString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Map<String, List<ProcessorStat>> processorStats)
implements
Expand All @@ -57,6 +58,11 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
* Read from a stream.
*/
public static IngestStats read(StreamInput in) throws IOException {
// while reading the processors, we're going to encounter identical name and type strings *repeatedly*
// it's advantageous to discard the endless copies of the same strings and canonical-ize them to keep our
// heap usage under control. note: this map is key to key, because of the limitations of the set interface.
final Map<String, String> namesAndTypesCache = new HashMap<>();

var stats = readStats(in);
var size = in.readVInt();
if (stats == Stats.IDENTITY && size == 0) {
Expand All @@ -76,6 +82,9 @@ public static IngestStats read(StreamInput in) throws IOException {
var processorName = in.readString();
var processorType = in.readString();
var processorStat = readStats(in);
// pass these name and type through the local names and types cache to canonical-ize them
processorName = namesAndTypesCache.computeIfAbsent(processorName, Function.identity());
processorType = namesAndTypesCache.computeIfAbsent(processorType, Function.identity());
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
}
processorStats.put(pipelineId, Collections.unmodifiableList(processorStatsPerPipeline));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2181,7 +2181,7 @@ public void testStatName() {
Processor processor = mock(Processor.class);
String name = randomAlphaOfLength(10);
when(processor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(processor), equalTo(name));
assertThat(IngestService.getProcessorName(processor), sameInstance(name));
String tag = randomAlphaOfLength(10);
when(processor.getTag()).thenReturn(tag);
assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;

public class IngestStatsTests extends ESTestCase {
Expand All @@ -37,6 +38,33 @@ public void testIdentitySerialization() throws IOException {
assertThat(serializedStats, sameInstance(IngestStats.IDENTITY));
}

public void testProcessorNameAndTypeIdentitySerialization() throws IOException {
IngestStats.Builder builder = new IngestStats.Builder();
builder.addPipelineMetrics("pipeline_id", new IngestPipelineMetric());
builder.addProcessorMetrics("pipeline_id", "set", "set", new IngestMetric());
builder.addProcessorMetrics("pipeline_id", "set:foo", "set", new IngestMetric());
builder.addProcessorMetrics("pipeline_id", "set:bar", "set", new IngestMetric());
builder.addTotalMetrics(new IngestMetric());

IngestStats serializedStats = serialize(builder.build());
List<IngestStats.ProcessorStat> processorStats = serializedStats.processorStats().get("pipeline_id");

// these are just table stakes
assertThat(processorStats.get(0).name(), is("set"));
assertThat(processorStats.get(0).type(), is("set"));
assertThat(processorStats.get(1).name(), is("set:foo"));
assertThat(processorStats.get(1).type(), is("set"));
assertThat(processorStats.get(2).name(), is("set:bar"));
assertThat(processorStats.get(2).type(), is("set"));

// this is actually interesting, though -- we're canonical-izing these strings to keep our heap usage under control
final String set = processorStats.get(0).name();
assertThat(processorStats.get(0).name(), sameInstance(set));
assertThat(processorStats.get(0).type(), sameInstance(set));
assertThat(processorStats.get(1).type(), sameInstance(set));
assertThat(processorStats.get(2).type(), sameInstance(set));
}

public void testStatsMerge() {
var first = randomStats();
var second = randomStats();
Expand Down

0 comments on commit d59a0d9

Please sign in to comment.