From a7ca92686225dbc963caaffe893bfe3a4f0e1fbb Mon Sep 17 00:00:00 2001 From: Asim M Date: Thu, 20 Feb 2025 00:41:50 +0000 Subject: [PATCH 1/2] Introduce `execution_hint` for Cardinality aggregation (#17312) - backport to 2.19 --------- Signed-off-by: Siddharth Rayabharam Signed-off-by: Asim Mahmood Signed-off-by: Asim M Co-authored-by: Siddharth Rayabharam Co-authored-by: Craig Perkins (cherry picked from commit e3a6ccadc942c64e83bd224031bc4d1c6ab14623) --- CHANGELOG.md | 1 + .../CardinalityAggregationBuilder.java | 39 +++-- .../metrics/CardinalityAggregator.java | 17 ++- .../metrics/CardinalityAggregatorFactory.java | 36 ++++- .../CardinalityAggregatorSupplier.java | 3 +- .../metrics/CardinalityAggregatorTests.java | 137 ++++++++++++++++++ .../aggregations/AggregatorTestCase.java | 8 +- 7 files changed, 221 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc78e30c44b9b..dc1b3311b8261 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Add execution_hint to cardinality aggregator request - backport to 2.19 (#[TBD](TBD)) ### Dependencies - Bump netty from 4.1.117.Final to 4.1.118.Final ([#17320](https://github.com/opensearch-project/OpenSearch/pull/17320)) diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregationBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregationBuilder.java index a7516a6fd6b24..202a6babafec7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregationBuilder.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregationBuilder.java @@ -68,6 +68,7 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation private static final ParseField REHASH = new ParseField("rehash").withAllDeprecated("no replacement - values will always be rehashed"); public static final ParseField PRECISION_THRESHOLD_FIELD = new ParseField("precision_threshold"); + public static final ParseField EXECUTION_HINT_FIELD = new ParseField("execution_hint"); public static final ObjectParser PARSER = ObjectParser.fromBuilder( NAME, @@ -76,6 +77,7 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation static { ValuesSourceAggregationBuilder.declareFields(PARSER, true, false, false); PARSER.declareLong(CardinalityAggregationBuilder::precisionThreshold, CardinalityAggregationBuilder.PRECISION_THRESHOLD_FIELD); + PARSER.declareString(CardinalityAggregationBuilder::executionHint, CardinalityAggregationBuilder.EXECUTION_HINT_FIELD); PARSER.declareLong((b, v) -> {/*ignore*/}, REHASH); } @@ -85,6 +87,8 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) { private Long precisionThreshold = null; + private String executionHint = null; + public CardinalityAggregationBuilder(String name) { super(name); } @@ -96,6 +100,7 @@ public CardinalityAggregationBuilder( ) { super(clone, factoriesBuilder, metadata); this.precisionThreshold = clone.precisionThreshold; + this.executionHint = clone.executionHint; } @Override @@ -111,6 +116,9 @@ public CardinalityAggregationBuilder(StreamInput in) throws IOException { if (in.readBoolean()) { precisionThreshold = in.readLong(); } + if (in.getVersion().onOrAfter(Version.V_2_19_1)) { + executionHint = in.readOptionalString(); + } } @Override @@ -125,6 +133,9 @@ protected void innerWriteTo(StreamOutput out) throws IOException { if (hasPrecisionThreshold) { out.writeLong(precisionThreshold); } + if (out.getVersion().onOrAfter(Version.V_2_19_1)) { + out.writeOptionalString(executionHint); + } } @Override @@ -146,13 +157,9 @@ public CardinalityAggregationBuilder precisionThreshold(long precisionThreshold) return this; } - /** - * Get the precision threshold. Higher values improve accuracy but also - * increase memory usage. Will return null if the - * precisionThreshold has not been set yet. - */ - public Long precisionThreshold() { - return precisionThreshold; + public CardinalityAggregationBuilder executionHint(String executionHint) { + this.executionHint = executionHint; + return this; } @Override @@ -162,7 +169,16 @@ protected CardinalityAggregatorFactory innerBuild( AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder ) throws IOException { - return new CardinalityAggregatorFactory(name, config, precisionThreshold, queryShardContext, parent, subFactoriesBuilder, metadata); + return new CardinalityAggregatorFactory( + name, + config, + precisionThreshold, + queryShardContext, + parent, + subFactoriesBuilder, + metadata, + executionHint + ); } @Override @@ -170,12 +186,15 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th if (precisionThreshold != null) { builder.field(PRECISION_THRESHOLD_FIELD.getPreferredName(), precisionThreshold); } + if (executionHint != null) { + builder.field(EXECUTION_HINT_FIELD.getPreferredName(), executionHint); + } return builder; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), precisionThreshold); + return Objects.hash(super.hashCode(), precisionThreshold, executionHint); } @Override @@ -184,7 +203,7 @@ public boolean equals(Object obj) { if (obj == null || getClass() != obj.getClass()) return false; if (super.equals(obj) == false) return false; CardinalityAggregationBuilder other = (CardinalityAggregationBuilder) obj; - return Objects.equals(precisionThreshold, other.precisionThreshold); + return Objects.equals(precisionThreshold, other.precisionThreshold) && Objects.equals(executionHint, other.executionHint); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java index 0f3d975960364..6d8930e50f674 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java @@ -89,6 +89,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue private static final Logger logger = LogManager.getLogger(CardinalityAggregator.class); + private final CardinalityAggregatorFactory.ExecutionMode executionMode; private final int precision; private final ValuesSource valuesSource; @@ -113,7 +114,8 @@ public CardinalityAggregator( int precision, SearchContext context, Aggregator parent, - Map metadata + Map metadata, + CardinalityAggregatorFactory.ExecutionMode executionMode ) throws IOException { super(name, context, parent, metadata); // TODO: Stop using nulls here @@ -121,6 +123,7 @@ public CardinalityAggregator( this.precision = precision; this.counts = valuesSource == null ? null : new HyperLogLogPlusPlus(precision, context.bigArrays(), 1); this.valuesSourceConfig = valuesSourceConfig; + this.executionMode = executionMode; } @Override @@ -151,7 +154,11 @@ private Collector pickCollector(LeafReaderContext ctx) throws IOException { if (maxOrd == 0) { emptyCollectorsUsed++; return new EmptyCollector(); - } else { + } else if (executionMode == CardinalityAggregatorFactory.ExecutionMode.ORDINALS) { // Force OrdinalsCollector + ordinalsCollectorsUsed++; + collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays()); + } else if (executionMode == null) { + // no hint provided, fall back to heuristics final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd); final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision); // only use ordinals if they don't increase memory usage by more than 25% @@ -164,7 +171,7 @@ private Collector pickCollector(LeafReaderContext ctx) throws IOException { } } - if (collector == null) { // not able to build an OrdinalsCollector + if (collector == null) { // not able to build an OrdinalsCollector, or hint is direct stringHashingCollectorsUsed++; collector = new DirectCollector(counts, MurmurHash3Values.hash(valuesSource.bytesValues(ctx))); } @@ -480,7 +487,7 @@ public void close() { * * @opensearch.internal */ - private static class DirectCollector extends Collector { + static class DirectCollector extends Collector { private final MurmurHash3Values hashes; private final HyperLogLogPlusPlus counts; @@ -517,7 +524,7 @@ public void close() { * * @opensearch.internal */ - private static class OrdinalsCollector extends Collector { + static class OrdinalsCollector extends Collector { private static final long SHALLOW_FIXEDBITSET_SIZE = RamUsageEstimator.shallowSizeOfInstance(FixedBitSet.class); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java index 980667b45324e..3d82386d12e57 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorFactory.java @@ -44,6 +44,7 @@ import org.opensearch.search.internal.SearchContext; import java.io.IOException; +import java.util.Locale; import java.util.Map; /** @@ -53,6 +54,33 @@ */ class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory { + /** + * Execution mode for cardinality agg + * + * @opensearch.internal + */ + public enum ExecutionMode { + DIRECT, + ORDINALS; + + ExecutionMode() {} + + public static ExecutionMode fromString(String value) { + try { + return ExecutionMode.valueOf(value.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unknown execution_hint: [" + value + "], expected any of [direct, ordinals]"); + } + } + + @Override + public String toString() { + return this.name().toLowerCase(Locale.ROOT); + } + } + + private final ExecutionMode executionMode; + private final Long precisionThreshold; CardinalityAggregatorFactory( @@ -62,10 +90,12 @@ class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory { QueryShardContext queryShardContext, AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, - Map metadata + Map metadata, + String executionHint ) throws IOException { super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); this.precisionThreshold = precisionThreshold; + this.executionMode = executionHint == null ? null : ExecutionMode.fromString(executionHint); } public static void registerAggregators(ValuesSourceRegistry.Builder builder) { @@ -74,7 +104,7 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) { @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { - return new CardinalityAggregator(name, config, precision(), searchContext, parent, metadata); + return new CardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode); } @Override @@ -86,7 +116,7 @@ protected Aggregator doCreateInternal( ) throws IOException { return queryShardContext.getValuesSourceRegistry() .getAggregator(CardinalityAggregationBuilder.REGISTRY_KEY, config) - .build(name, config, precision(), searchContext, parent, metadata); + .build(name, config, precision(), searchContext, parent, metadata, executionMode); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorSupplier.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorSupplier.java index d5cb0242762fd..42426697e7629 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorSupplier.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorSupplier.java @@ -51,6 +51,7 @@ Aggregator build( int precision, SearchContext context, Aggregator parent, - Map metadata + Map metadata, + CardinalityAggregatorFactory.ExecutionMode executionMode ) throws IOException; } diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java index b5dd27e37c332..6a1f95bcc21f4 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityAggregatorTests.java @@ -37,6 +37,7 @@ import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.KeywordField; import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.DirectoryReader; @@ -66,6 +67,7 @@ import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregatorTestCase; import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.support.AggregationInspectionHelper; @@ -497,4 +499,139 @@ protected CountingAggregator createCountingAggregator( ) ); } + + private void testAggregationExecutionHint( + AggregationBuilder aggregationBuilder, + Query query, + CheckedConsumer buildIndex, + Consumer verify, + Consumer verifyCollector, + MappedFieldType fieldType + ) throws IOException { + try (Directory directory = newDirectory()) { + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + buildIndex.accept(indexWriter); + indexWriter.close(); + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + CountingAggregator aggregator = new CountingAggregator( + new AtomicInteger(), + createAggregator(aggregationBuilder, indexSearcher, fieldType) + ); + aggregator.preCollection(); + indexSearcher.search(query, aggregator); + aggregator.postCollection(); + + MultiBucketConsumerService.MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer( + Integer.MAX_VALUE, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + aggregator.context().bigArrays(), + getMockScriptService(), + reduceBucketConsumer, + PipelineAggregator.PipelineTree.EMPTY + ); + InternalCardinality topLevel = (InternalCardinality) aggregator.buildTopLevel(); + InternalCardinality card = (InternalCardinality) topLevel.reduce(Collections.singletonList(topLevel), context); + doAssertReducedMultiBucketConsumer(card, reduceBucketConsumer); + + verify.accept(card); + verifyCollector.accept(aggregator.getSelectedCollector()); + } + } + } + + public void testInvalidExecutionHint() throws IOException { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("number", NumberFieldMapper.NumberType.LONG); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("number") + .executionHint("invalid"); + assertThrows(IllegalArgumentException.class, () -> testAggregationExecutionHint(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new NumericDocValuesField("number", 7))); + iw.addDocument(singleton(new NumericDocValuesField("number", 8))); + iw.addDocument(singleton(new NumericDocValuesField("number", 9))); + }, card -> { + assertEquals(3, card.getValue(), 0); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { assertTrue(collector instanceof CardinalityAggregator.DirectCollector); }, fieldType)); + } + + public void testNoExecutionHintWithNumericDocValues() throws IOException { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("number", NumberFieldMapper.NumberType.LONG); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("number"); + testAggregationExecutionHint(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new NumericDocValuesField("number", 7))); + iw.addDocument(singleton(new NumericDocValuesField("number", 8))); + iw.addDocument(singleton(new NumericDocValuesField("number", 9))); + }, card -> { + assertEquals(3, card.getValue(), 0); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { assertTrue(collector instanceof CardinalityAggregator.DirectCollector); }, fieldType); + } + + public void testDirectExecutionHintWithNumericDocValues() throws IOException { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("number", NumberFieldMapper.NumberType.LONG); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("number") + .executionHint("direct"); + testAggregationExecutionHint(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new NumericDocValuesField("number", 7))); + iw.addDocument(singleton(new NumericDocValuesField("number", 8))); + iw.addDocument(singleton(new NumericDocValuesField("number", 9))); + }, card -> { + assertEquals(3, card.getValue(), 0); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { assertTrue(collector instanceof CardinalityAggregator.DirectCollector); }, fieldType); + } + + public void testOrdinalsExecutionHintWithNumericDocValues() throws IOException { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("number", NumberFieldMapper.NumberType.LONG); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("number") + .executionHint("ordinals"); + testAggregationExecutionHint(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new NumericDocValuesField("number", 7))); + iw.addDocument(singleton(new NumericDocValuesField("number", 8))); + iw.addDocument(singleton(new NumericDocValuesField("number", 9))); + }, card -> { + assertEquals(3, card.getValue(), 0); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { assertTrue(collector instanceof CardinalityAggregator.DirectCollector); }, fieldType); + } + + public void testNoExecutionHintWithByteValues() throws IOException { + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field"); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("field"); + + testAggregationExecutionHint(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new SortedDocValuesField("field", new BytesRef()))); + }, card -> { + assertEquals(1, card.getValue(), 0); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { assertTrue(collector instanceof CardinalityAggregator.OrdinalsCollector); }, fieldType); + } + + public void testDirectExecutionHintWithByteValues() throws IOException { + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field"); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("field") + .executionHint("direct"); + testAggregationExecutionHint(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new SortedDocValuesField("field", new BytesRef()))); + }, card -> { + assertEquals(1, card.getValue(), 0); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { assertTrue(collector instanceof CardinalityAggregator.DirectCollector); }, fieldType); + } + + public void testOrdinalsExecutionHintWithByteValues() throws IOException { + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("field"); + final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("_name").field("field") + .executionHint("ordinals"); + testAggregationExecutionHint(aggregationBuilder, new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new SortedDocValuesField("field", new BytesRef()))); + }, card -> { + assertEquals(1, card.getValue(), 0); + assertTrue(AggregationInspectionHelper.hasValue(card)); + }, collector -> { assertTrue(collector instanceof CardinalityAggregator.OrdinalsCollector); }, fieldType); + } } diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 5d322637dc645..2c3589f8a7377 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -1328,6 +1328,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { protected static class CountingAggregator extends Aggregator { private final AtomicInteger collectCounter; public final Aggregator delegate; + private LeafBucketCollector selectedCollector; public CountingAggregator(AtomicInteger collectCounter, Aggregator delegate) { this.collectCounter = collectCounter; @@ -1338,6 +1339,10 @@ public AtomicInteger getCollectCount() { return collectCounter; } + public LeafBucketCollector getSelectedCollector() { + return selectedCollector; + } + @Override public void close() { delegate.close(); @@ -1378,7 +1383,8 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOExce return new LeafBucketCollector() { @Override public void collect(int doc, long bucket) throws IOException { - delegate.getLeafCollector(ctx).collect(doc, bucket); + selectedCollector = delegate.getLeafCollector(ctx); + selectedCollector.collect(doc, bucket); collectCounter.incrementAndGet(); } }; From c7c3da1fb614158dce9e69dae7d27c6d982d10b7 Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Fri, 21 Feb 2025 19:54:37 +0000 Subject: [PATCH 2/2] Update changelog Signed-off-by: Asim Mahmood --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc1b3311b8261..47accf1340e3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added -- Add execution_hint to cardinality aggregator request - backport to 2.19 (#[TBD](TBD)) +- Add execution_hint to cardinality aggregator request - backport to 2.19 (#[17420](https://github.com/opensearch-project/OpenSearch/pull/17420)) ### Dependencies - Bump netty from 4.1.117.Final to 4.1.118.Final ([#17320](https://github.com/opensearch-project/OpenSearch/pull/17320))