Skip to content

Commit

Permalink
Introduce execution_hint for Cardinality aggregation (#17312)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Siddharth Rayabharam <[email protected]>
Signed-off-by: Asim Mahmood <[email protected]>
Signed-off-by: Asim M <[email protected]>
Co-authored-by: Siddharth Rayabharam <[email protected]>
Co-authored-by: Craig Perkins <[email protected]>
  • Loading branch information
3 people authored Feb 20, 2025
1 parent 43e589a commit e3a6cca
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957))
- Added pull-based Ingestion (APIs, for ingestion source, a Kafka plugin, and IngestionEngine that pulls data from the ingestion source) ([#16958](https://github.com/opensearch-project/OpenSearch/pull/16958))
- Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223)
- Add execution_hint to cardinality aggregator request (#[17312](https://github.com/opensearch-project/OpenSearch/pull/17312))

### Dependencies
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation

private static final ParseField REHASH = new ParseField("rehash").withAllDeprecated("no replacement - values will always be rehashed");
public static final ParseField PRECISION_THRESHOLD_FIELD = new ParseField("precision_threshold");
public static final ParseField EXECUTION_HINT_FIELD = new ParseField("execution_hint");

public static final ObjectParser<CardinalityAggregationBuilder, String> PARSER = ObjectParser.fromBuilder(
NAME,
Expand All @@ -76,6 +77,7 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
static {
ValuesSourceAggregationBuilder.declareFields(PARSER, true, false, false);
PARSER.declareLong(CardinalityAggregationBuilder::precisionThreshold, CardinalityAggregationBuilder.PRECISION_THRESHOLD_FIELD);
PARSER.declareString(CardinalityAggregationBuilder::executionHint, CardinalityAggregationBuilder.EXECUTION_HINT_FIELD);
PARSER.declareLong((b, v) -> {/*ignore*/}, REHASH);
}

Expand All @@ -85,6 +87,8 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {

private Long precisionThreshold = null;

private String executionHint = null;

public CardinalityAggregationBuilder(String name) {
super(name);
}
Expand All @@ -96,6 +100,7 @@ public CardinalityAggregationBuilder(
) {
super(clone, factoriesBuilder, metadata);
this.precisionThreshold = clone.precisionThreshold;
this.executionHint = clone.executionHint;
}

@Override
Expand All @@ -111,6 +116,9 @@ public CardinalityAggregationBuilder(StreamInput in) throws IOException {
if (in.readBoolean()) {
precisionThreshold = in.readLong();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
executionHint = in.readOptionalString();
}
}

@Override
Expand All @@ -125,6 +133,9 @@ protected void innerWriteTo(StreamOutput out) throws IOException {
if (hasPrecisionThreshold) {
out.writeLong(precisionThreshold);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalString(executionHint);
}
}

@Override
Expand All @@ -146,13 +157,9 @@ public CardinalityAggregationBuilder precisionThreshold(long precisionThreshold)
return this;
}

/**
* Get the precision threshold. Higher values improve accuracy but also
* increase memory usage. Will return <code>null</code> if the
* precisionThreshold has not been set yet.
*/
public Long precisionThreshold() {
return precisionThreshold;
public CardinalityAggregationBuilder executionHint(String executionHint) {
this.executionHint = executionHint;
return this;
}

@Override
Expand All @@ -162,20 +169,32 @@ protected CardinalityAggregatorFactory innerBuild(
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder
) throws IOException {
return new CardinalityAggregatorFactory(name, config, precisionThreshold, queryShardContext, parent, subFactoriesBuilder, metadata);
return new CardinalityAggregatorFactory(
name,
config,
precisionThreshold,
queryShardContext,
parent,
subFactoriesBuilder,
metadata,
executionHint
);
}

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (precisionThreshold != null) {
builder.field(PRECISION_THRESHOLD_FIELD.getPreferredName(), precisionThreshold);
}
if (executionHint != null) {
builder.field(EXECUTION_HINT_FIELD.getPreferredName(), executionHint);
}
return builder;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), precisionThreshold);
return Objects.hash(super.hashCode(), precisionThreshold, executionHint);
}

@Override
Expand All @@ -184,7 +203,7 @@ public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
CardinalityAggregationBuilder other = (CardinalityAggregationBuilder) obj;
return Objects.equals(precisionThreshold, other.precisionThreshold);
return Objects.equals(precisionThreshold, other.precisionThreshold) && Objects.equals(executionHint, other.executionHint);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue

private static final Logger logger = LogManager.getLogger(CardinalityAggregator.class);

private final CardinalityAggregatorFactory.ExecutionMode executionMode;
private final int precision;
private final ValuesSource valuesSource;

Expand All @@ -113,14 +114,16 @@ public CardinalityAggregator(
int precision,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
Map<String, Object> metadata,
CardinalityAggregatorFactory.ExecutionMode executionMode
) throws IOException {
super(name, context, parent, metadata);
// TODO: Stop using nulls here
this.valuesSource = valuesSourceConfig.hasValues() ? valuesSourceConfig.getValuesSource() : null;
this.precision = precision;
this.counts = valuesSource == null ? null : new HyperLogLogPlusPlus(precision, context.bigArrays(), 1);
this.valuesSourceConfig = valuesSourceConfig;
this.executionMode = executionMode;
}

@Override
Expand All @@ -144,14 +147,17 @@ private Collector pickCollector(LeafReaderContext ctx) throws IOException {
}

Collector collector = null;
if (valuesSource instanceof ValuesSource.Bytes.WithOrdinals) {
ValuesSource.Bytes.WithOrdinals source = (ValuesSource.Bytes.WithOrdinals) valuesSource;
if (valuesSource instanceof ValuesSource.Bytes.WithOrdinals source) {
final SortedSetDocValues ordinalValues = source.ordinalsValues(ctx);
final long maxOrd = ordinalValues.getValueCount();
if (maxOrd == 0) {
emptyCollectorsUsed++;
return new EmptyCollector();
} else {
} else if (executionMode == CardinalityAggregatorFactory.ExecutionMode.ORDINALS) { // Force OrdinalsCollector
ordinalsCollectorsUsed++;
collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays());
} else if (executionMode == null) {
// no hint provided, fall back to heuristics
final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd);
final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision);
// only use ordinals if they don't increase memory usage by more than 25%
Expand All @@ -164,7 +170,7 @@ private Collector pickCollector(LeafReaderContext ctx) throws IOException {
}
}

if (collector == null) { // not able to build an OrdinalsCollector
if (collector == null) { // not able to build an OrdinalsCollector, or hint is direct
stringHashingCollectorsUsed++;
collector = new DirectCollector(counts, MurmurHash3Values.hash(valuesSource.bytesValues(ctx)));
}
Expand Down Expand Up @@ -480,7 +486,7 @@ public void close() {
*
* @opensearch.internal
*/
private static class DirectCollector extends Collector {
static class DirectCollector extends Collector {

private final MurmurHash3Values hashes;
private final HyperLogLogPlusPlus counts;
Expand Down Expand Up @@ -517,7 +523,7 @@ public void close() {
*
* @opensearch.internal
*/
private static class OrdinalsCollector extends Collector {
static class OrdinalsCollector extends Collector {

private static final long SHALLOW_FIXEDBITSET_SIZE = RamUsageEstimator.shallowSizeOfInstance(FixedBitSet.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Locale;
import java.util.Map;

/**
Expand All @@ -53,6 +54,33 @@
*/
class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory {

/**
* Execution mode for cardinality agg
*
* @opensearch.internal
*/
public enum ExecutionMode {
DIRECT,
ORDINALS;

ExecutionMode() {}

public static ExecutionMode fromString(String value) {
try {
return ExecutionMode.valueOf(value.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unknown execution_hint: [" + value + "], expected any of [direct, ordinals]");
}
}

@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
}

private final ExecutionMode executionMode;

private final Long precisionThreshold;

CardinalityAggregatorFactory(
Expand All @@ -62,10 +90,12 @@ class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory {
QueryShardContext queryShardContext,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata
Map<String, Object> metadata,
String executionHint
) throws IOException {
super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata);
this.precisionThreshold = precisionThreshold;
this.executionMode = executionHint == null ? null : ExecutionMode.fromString(executionHint);
}

public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
Expand All @@ -74,7 +104,7 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {

@Override
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
return new CardinalityAggregator(name, config, precision(), searchContext, parent, metadata);
return new CardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode);
}

@Override
Expand All @@ -86,7 +116,7 @@ protected Aggregator doCreateInternal(
) throws IOException {
return queryShardContext.getValuesSourceRegistry()
.getAggregator(CardinalityAggregationBuilder.REGISTRY_KEY, config)
.build(name, config, precision(), searchContext, parent, metadata);
.build(name, config, precision(), searchContext, parent, metadata, executionMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Aggregator build(
int precision,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
Map<String, Object> metadata,
CardinalityAggregatorFactory.ExecutionMode executionMode
) throws IOException;
}
Loading

0 comments on commit e3a6cca

Please sign in to comment.