diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index d664b10cfa16e..4b929c692edcd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -82,6 +82,7 @@ public ScriptService scriptService() { public void consumeBucketsAndMaybeBreak(int size) { multiBucketConsumer.accept(size); } + } protected final String name; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java index e11899fff3390..41b1a9aef6230 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java @@ -66,6 +66,12 @@ protected InternalMultiBucketAggregation(StreamInput in) throws IOException { */ public abstract B createBucket(InternalAggregations aggregations, B prototype); + /** + * Reduce a list of same-keyed buckets (from multiple shards) to a single bucket. This + * requires all buckets to have the same key. + */ + protected abstract B reduceBucket(List buckets, ReduceContext context); + @Override public abstract List getBuckets(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java index 46eac6ce55dda..57c7d703cbdf6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java @@ -88,21 +88,6 @@ public Aggregations getAggregations() { return aggregations; } - InternalBucket reduce(List buckets, ReduceContext context) { - InternalBucket reduced = null; - List aggregationsList = new ArrayList<>(buckets.size()); - for (InternalBucket bucket : buckets) { - if (reduced == null) { - reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations); - } else { - reduced.docCount += bucket.docCount; - } - aggregationsList.add(bucket.aggregations); - } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, context); - return reduced; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -212,7 +197,7 @@ public InternalAggregation doReduce(List aggregations, Redu ArrayList reducedBuckets = new ArrayList<>(bucketsMap.size()); for (List sameRangeList : bucketsMap.values()) { - InternalBucket reducedBucket = sameRangeList.get(0).reduce(sameRangeList, reduceContext); + InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext); if(reducedBucket.docCount >= 1){ reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reducedBucket); @@ -228,6 +213,23 @@ public InternalAggregation doReduce(List aggregations, Redu return reduced; } + @Override + protected InternalBucket reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + InternalBucket reduced = null; + List aggregationsList = new ArrayList<>(buckets.size()); + for (InternalBucket bucket : buckets) { + if (reduced == null) { + reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations); + } else { + reduced.docCount += bucket.docCount; + } + aggregationsList.add(bucket.aggregations); + } + reduced.aggregations = InternalAggregations.reduce(aggregationsList, context); + return reduced; + } + @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.startArray(CommonFields.BUCKETS.getPreferredName()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java index 3da7e85b5f74b..9bfe0e7090e0f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java @@ -171,7 +171,7 @@ public InternalAggregation doReduce(List aggregations, Redu while (pq.size() > 0) { BucketIterator bucketIt = pq.poll(); if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) { - InternalBucket reduceBucket = buckets.get(0).reduce(buckets, reduceContext); + InternalBucket reduceBucket = reduceBucket(buckets, reduceContext); buckets.clear(); reduceContext.consumeBucketsAndMaybeBreak(1); result.add(reduceBucket); @@ -186,7 +186,7 @@ public InternalAggregation doReduce(List aggregations, Redu } } if (buckets.size() > 0) { - InternalBucket reduceBucket = buckets.get(0).reduce(buckets, reduceContext); + InternalBucket reduceBucket = reduceBucket(buckets, reduceContext); reduceContext.consumeBucketsAndMaybeBreak(1); result.add(reduceBucket); } @@ -194,6 +194,19 @@ public InternalAggregation doReduce(List aggregations, Redu return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls, pipelineAggregators(), metaData); } + @Override + protected InternalBucket reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + List aggregations = new ArrayList<>(buckets.size()); + long docCount = 0; + for (InternalBucket bucket : buckets) { + docCount += bucket.docCount; + aggregations.add(bucket.aggregations); + } + InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + return new InternalBucket(sourceNames, formats, buckets.get(0).key, reverseMuls, docCount, aggs); + } + @Override public boolean equals(Object obj) { if (this == obj) return true; @@ -321,17 +334,6 @@ public Aggregations getAggregations() { return aggregations; } - InternalBucket reduce(List buckets, ReduceContext reduceContext) { - List aggregations = new ArrayList<>(buckets.size()); - long docCount = 0; - for (InternalBucket bucket : buckets) { - docCount += bucket.docCount; - aggregations.add(bucket.aggregations); - } - InternalAggregations aggs = InternalAggregations.reduce(aggregations, reduceContext); - return new InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs); - } - @Override public int compareKey(InternalBucket other) { for (int i = 0; i < key.size(); i++) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java index d866754c5626e..271d1c54d5898 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java @@ -87,21 +87,6 @@ public Aggregations getAggregations() { return aggregations; } - InternalBucket reduce(List buckets, ReduceContext context) { - InternalBucket reduced = null; - List aggregationsList = new ArrayList<>(buckets.size()); - for (InternalBucket bucket : buckets) { - if (reduced == null) { - reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed); - } else { - reduced.docCount += bucket.docCount; - } - aggregationsList.add(bucket.aggregations); - } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, context); - return reduced; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { if (keyed) { @@ -227,8 +212,25 @@ public InternalAggregation doReduce(List aggregations, Redu InternalFilters reduced = new InternalFilters(name, new ArrayList<>(bucketsList.size()), keyed, pipelineAggregators(), getMetaData()); for (List sameRangeList : bucketsList) { - reduced.buckets.add((sameRangeList.get(0)).reduce(sameRangeList, reduceContext)); + reduced.buckets.add(reduceBucket(sameRangeList, reduceContext)); + } + return reduced; + } + + @Override + protected InternalBucket reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + InternalBucket reduced = null; + List aggregationsList = new ArrayList<>(buckets.size()); + for (InternalBucket bucket : buckets) { + if (reduced == null) { + reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed); + } else { + reduced.docCount += bucket.docCount; + } + aggregationsList.add(bucket.aggregations); } + reduced.aggregations = InternalAggregations.reduce(aggregationsList, context); return reduced; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java index 4935b6c6ba7d2..c91f763b603c0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.util.LongHash; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; @@ -96,48 +95,6 @@ public void collect(int doc, long bucket) throws IOException { }; } - // private impl that stores a bucket ord. This allows for computing the aggregations lazily. - static class OrdinalBucket extends InternalGeoGridBucket { - - long bucketOrd; - InternalGeoGridBucket sourceBucket; // used to keep track of appropriate getKeyAsString method - - OrdinalBucket(InternalGeoGridBucket sourceBucket) { - super(sourceBucket.hashAsLong, sourceBucket.docCount, sourceBucket.aggregations); - this.sourceBucket = sourceBucket; - } - - void hashAsLong(long hashAsLong) { - this.hashAsLong = hashAsLong; - this.sourceBucket.hashAsLong = hashAsLong; - } - - @Override - InternalGeoGridBucket buildBucket(InternalGeoGridBucket bucket, long hashAsLong, long docCount, - InternalAggregations aggregations) { - OrdinalBucket ordBucket = new OrdinalBucket(bucket); - ordBucket.hashAsLong = hashAsLong; - ordBucket.docCount = docCount; - ordBucket.aggregations = aggregations; - // this is done because the aggregator may be rebuilt from cache (non OrdinalBucket), - // or it may be rebuilding from a new calculation, and therefore copying bucketOrd. - if (bucket instanceof OrdinalBucket) { - ordBucket.bucketOrd = ((OrdinalBucket) bucket).bucketOrd; - } - return ordBucket; - } - - @Override - public Object getKey() { - return sourceBucket.getKey(); - } - - @Override - public String getKeyAsString() { - return sourceBucket.getKeyAsString(); - } - } - abstract T buildAggregation(String name, int requiredSize, List buckets, List pipelineAggregators, Map metaData); @@ -154,24 +111,24 @@ public InternalGeoGrid buildAggregation(long owningBucketOrdinal) throws IOExcep final int size = (int) Math.min(bucketOrds.size(), shardSize); consumeBucketsAndMaybeBreak(size); - BucketPriorityQueue ordered = new BucketPriorityQueue(size); - OrdinalBucket spare = null; + BucketPriorityQueue ordered = new BucketPriorityQueue<>(size); + InternalGeoGridBucket spare = null; for (long i = 0; i < bucketOrds.size(); i++) { if (spare == null) { - spare = new OrdinalBucket(newEmptyBucket()); + spare = newEmptyBucket(); } // need a special function to keep the source bucket // up-to-date so it can get the appropriate key - spare.hashAsLong(bucketOrds.get(i)); + spare.hashAsLong = bucketOrds.get(i); spare.docCount = bucketDocCount(i); spare.bucketOrd = i; - spare = (OrdinalBucket) ordered.insertWithOverflow(spare); + spare = ordered.insertWithOverflow(spare); } final InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()]; for (int i = ordered.size() - 1; i >= 0; --i) { - final OrdinalBucket bucket = (OrdinalBucket) ordered.pop(); + final InternalGeoGridBucket bucket = ordered.pop(); bucket.aggregations = bucketAggregations(bucket.bucketOrd); list[i] = bucket; } @@ -183,10 +140,8 @@ public InternalGeoGrid buildEmptyAggregation() { return buildAggregation(name, requiredSize, Collections.emptyList(), pipelineAggregators(), metaData()); } - @Override public void doClose() { Releasables.close(bucketOrds); } - } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java index ae77e69b3e1ec..61c06a062cc05 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.util.LongObjectPagedHashMap; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -81,15 +82,15 @@ public List getBuckets() { @Override public InternalGeoGrid doReduce(List aggregations, ReduceContext reduceContext) { - LongObjectPagedHashMap> buckets = null; + LongObjectPagedHashMap> buckets = null; for (InternalAggregation aggregation : aggregations) { InternalGeoGrid grid = (InternalGeoGrid) aggregation; if (buckets == null) { buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays()); } for (Object obj : grid.buckets) { - B bucket = (B) obj; - List existingBuckets = buckets.get(bucket.hashAsLong()); + InternalGeoGridBucket bucket = (InternalGeoGridBucket) obj; + List existingBuckets = buckets.get(bucket.hashAsLong()); if (existingBuckets == null) { existingBuckets = new ArrayList<>(aggregations.size()); buckets.put(bucket.hashAsLong(), existingBuckets); @@ -100,9 +101,9 @@ public InternalGeoGrid doReduce(List aggregations, ReduceCo final int size = Math.toIntExact(reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size())); BucketPriorityQueue ordered = new BucketPriorityQueue<>(size); - for (LongObjectPagedHashMap.Cursor> cursor : buckets) { - List sameCellBuckets = cursor.value; - InternalGeoGridBucket removed = ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext)); + for (LongObjectPagedHashMap.Cursor> cursor : buckets) { + List sameCellBuckets = cursor.value; + InternalGeoGridBucket removed = ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext)); if (removed != null) { reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); } else { @@ -117,6 +118,21 @@ public InternalGeoGrid doReduce(List aggregations, ReduceCo return create(getName(), requiredSize, Arrays.asList(list), pipelineAggregators(), getMetaData()); } + @Override + protected InternalGeoGridBucket reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + List aggregationsList = new ArrayList<>(buckets.size()); + long docCount = 0; + for (InternalGeoGridBucket bucket : buckets) { + docCount += bucket.docCount; + aggregationsList.add(bucket.aggregations); + } + final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + return createBucket(buckets.get(0).hashAsLong, docCount, aggs); + } + + abstract B createBucket(long hashAsLong, long docCount, InternalAggregations aggregations); + @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.startArray(CommonFields.BUCKETS.getPreferredName()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java index 93002d607eaf8..0df9661aa9ec4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java @@ -23,13 +23,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; public abstract class InternalGeoGridBucket @@ -39,6 +36,8 @@ public abstract class InternalGeoGridBucket protected long docCount; protected InternalAggregations aggregations; + long bucketOrd; + public InternalGeoGridBucket(long hashAsLong, long docCount, InternalAggregations aggregations) { this.docCount = docCount; this.aggregations = aggregations; @@ -61,9 +60,6 @@ public void writeTo(StreamOutput out) throws IOException { aggregations.writeTo(out); } - abstract B buildBucket(InternalGeoGridBucket bucket, long hashAsLong, long docCount, InternalAggregations aggregations); - - long hashAsLong() { return hashAsLong; } @@ -89,17 +85,6 @@ public int compareTo(InternalGeoGridBucket other) { return 0; } - public B reduce(List buckets, InternalAggregation.ReduceContext context) { - List aggregationsList = new ArrayList<>(buckets.size()); - long docCount = 0; - for (B bucket : buckets) { - docCount += bucket.docCount; - aggregationsList.add(bucket.aggregations); - } - final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); - return buildBucket(this, hashAsLong, docCount, aggs); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java index 7c874781d0c22..31650fa820b1f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java @@ -57,6 +57,11 @@ InternalGeoGrid create(String name, int requiredSize, List buckets, List list, M return new InternalGeoHashGrid(name, requiredSize, buckets, list, metaData); } + @Override + InternalGeoHashGridBucket createBucket(long hashAsLong, long docCount, InternalAggregations aggregations) { + return new InternalGeoHashGridBucket(hashAsLong, docCount, aggregations); + } + @Override Reader getBucketReader() { return InternalGeoHashGridBucket::new; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGridBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGridBucket.java index 82f2f70c04c16..add34b0796ada 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGridBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGridBucket.java @@ -37,12 +37,6 @@ public InternalGeoHashGridBucket(StreamInput in) throws IOException { super(in); } - @Override - InternalGeoHashGridBucket buildBucket(InternalGeoGridBucket bucket, long hashAsLong, long docCount, - InternalAggregations aggregations) { - return new InternalGeoHashGridBucket(hashAsLong, docCount, aggregations); - } - @Override public String getKeyAsString() { return Geohash.stringEncode(hashAsLong); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoTileGrid.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoTileGrid.java index 8a842b66dcfca..4c11feb94f3d9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoTileGrid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoTileGrid.java @@ -57,6 +57,11 @@ InternalGeoGrid create(String name, int requiredSize, List buckets, List list, M return new InternalGeoTileGrid(name, requiredSize, buckets, list, metaData); } + @Override + InternalGeoTileGridBucket createBucket(long hashAsLong, long docCount, InternalAggregations aggregations) { + return new InternalGeoTileGridBucket(hashAsLong, docCount, aggregations); + } + @Override Reader getBucketReader() { return InternalGeoTileGridBucket::new; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoTileGridBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoTileGridBucket.java index fb9afbaaca4f8..20edc4e904db3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoTileGridBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoTileGridBucket.java @@ -37,12 +37,6 @@ public InternalGeoTileGridBucket(StreamInput in) throws IOException { super(in); } - @Override - InternalGeoTileGridBucket buildBucket(InternalGeoGridBucket bucket, long hashAsLong, long docCount, - InternalAggregations aggregations) { - return new InternalGeoTileGridBucket(hashAsLong, docCount, aggregations); - } - @Override public String getKeyAsString() { return GeoTileUtils.stringEncode(hashAsLong); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java index 3be59c0c1f7ec..c776d764637dd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -121,17 +121,6 @@ public Aggregations getAggregations() { return aggregations; } - Bucket reduce(List buckets, Rounding rounding, ReduceContext context) { - List aggregations = new ArrayList<>(buckets.size()); - long docCount = 0; - for (Bucket bucket : buckets) { - docCount += bucket.docCount; - aggregations.add((InternalAggregations) bucket.getAggregations()); - } - InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); - return new InternalAutoDateHistogram.Bucket(rounding.round(key), docCount, format, aggs); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { String keyAsString = format.format(key).toString(); @@ -328,14 +317,14 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (pq.size() > 0) { // list of buckets coming from different shards that have the same key List currentBuckets = new ArrayList<>(); - double key = reduceRounding.round(pq.top().current.key); + long key = reduceRounding.round(pq.top().current.key); do { final IteratorAndCurrent top = pq.top(); if (reduceRounding.round(top.current.key) != key) { // the key changes, reduce what we already buffered and reset the buffer for current buckets - final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceRounding, reduceContext); + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); currentBuckets.clear(); @@ -355,7 +344,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { } while (pq.size() > 0); if (currentBuckets.isEmpty() == false) { - final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceRounding, reduceContext); + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); } @@ -391,7 +380,7 @@ private List mergeBuckets(List reducedBuckets, Rounding reduceRo sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); } else { reduceContext.consumeBucketsAndMaybeBreak(1); - mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, reduceRounding, reduceContext)); + mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); sameKeyedBuckets.clear(); key = roundedBucketKey; reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); @@ -400,12 +389,25 @@ private List mergeBuckets(List reducedBuckets, Rounding reduceRo } if (sameKeyedBuckets.isEmpty() == false) { reduceContext.consumeBucketsAndMaybeBreak(1); - mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, reduceRounding, reduceContext)); + mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); } reducedBuckets = mergedBuckets; return reducedBuckets; } + @Override + protected Bucket reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + List aggregations = new ArrayList<>(buckets.size()); + long docCount = 0; + for (Bucket bucket : buckets) { + docCount += bucket.docCount; + aggregations.add((InternalAggregations) bucket.getAggregations()); + } + InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + return new InternalAutoDateHistogram.Bucket(buckets.get(0).key, docCount, format, aggs); + } + private static class BucketReduceResult { List buckets; RoundingInfo roundingInfo; @@ -547,7 +549,7 @@ private BucketReduceResult mergeConsecutiveBuckets(List reducedBuckets, Bucket bucket = reducedBuckets.get(i); if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) { reduceContext.consumeBucketsAndMaybeBreak(1); - mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, roundingInfo.rounding, reduceContext)); + mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); sameKeyedBuckets.clear(); key = roundingInfo.rounding.round(bucket.key); } @@ -556,7 +558,7 @@ private BucketReduceResult mergeConsecutiveBuckets(List reducedBuckets, } if (sameKeyedBuckets.isEmpty() == false) { reduceContext.consumeBucketsAndMaybeBreak(1); - mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, roundingInfo.rounding, reduceContext)); + mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); } return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index c21e6bde361a0..69c8552af32d8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -125,17 +125,6 @@ public Aggregations getAggregations() { return aggregations; } - Bucket reduce(List buckets, ReduceContext context) { - List aggregations = new ArrayList<>(buckets.size()); - long docCount = 0; - for (Bucket bucket : buckets) { - docCount += bucket.docCount; - aggregations.add((InternalAggregations) bucket.getAggregations()); - } - InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); - return new InternalDateHistogram.Bucket(key, docCount, keyed, format, aggs); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { String keyAsString = format.format(key).toString(); @@ -342,7 +331,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (top.current.key != key) { // the key changes, reduce what we already buffered and reset the buffer for current buckets - final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext); + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); @@ -366,7 +355,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { } while (pq.size() > 0); if (currentBuckets.isEmpty() == false) { - final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext); + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); @@ -379,6 +368,23 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { return reducedBuckets; } + /** + * Reduce a list of same-keyed buckets (from multiple shards) to a single bucket. This + * requires all buckets to have the same key. + */ + @Override + protected Bucket reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + List aggregations = new ArrayList<>(buckets.size()); + long docCount = 0; + for (Bucket bucket : buckets) { + docCount += bucket.docCount; + aggregations.add((InternalAggregations) bucket.getAggregations()); + } + InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + return createBucket(buckets.get(0).key, docCount, aggs); + } + private void addEmptyBuckets(List list, ReduceContext reduceContext) { Bucket lastBucket = null; ExtendedBounds bounds = emptyBucketInfo.bounds; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index beaec315b7e94..e814fbca290d9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -121,17 +121,6 @@ public Aggregations getAggregations() { return aggregations; } - Bucket reduce(List buckets, ReduceContext context) { - List aggregations = new ArrayList<>(buckets.size()); - long docCount = 0; - for (Bucket bucket : buckets) { - docCount += bucket.docCount; - aggregations.add((InternalAggregations) bucket.getAggregations()); - } - InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); - return new InternalHistogram.Bucket(key, docCount, keyed, format, aggs); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { String keyAsString = format.format(key).toString(); @@ -324,7 +313,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (Double.compare(top.current.key, key) != 0) { // The key changes, reduce what we already buffered and reset the buffer for current buckets. // Using Double.compare instead of != to handle NaN correctly. - final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext); + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); @@ -348,7 +337,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { } while (pq.size() > 0); if (currentBuckets.isEmpty() == false) { - final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext); + final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); @@ -361,6 +350,19 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { return reducedBuckets; } + @Override + protected Bucket reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + List aggregations = new ArrayList<>(buckets.size()); + long docCount = 0; + for (Bucket bucket : buckets) { + docCount += bucket.docCount; + aggregations.add((InternalAggregations) bucket.getAggregations()); + } + InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + return createBucket(buckets.get(0).key, docCount, aggs); + } + private double nextKey(double key) { return round(key + emptyBucketInfo.interval + emptyBucketInfo.interval / 2); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java index a9d4edce9dd5f..4b3b29d6016b9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import static java.util.Collections.unmodifiableList; @@ -264,6 +265,14 @@ public InternalAggregation doReduce(List aggregations, Redu return new InternalBinaryRange(name, format, keyed, buckets, pipelineAggregators(), metaData); } + @Override + protected Bucket reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + List aggregationsList = buckets.stream().map(bucket -> bucket.aggregations).collect(Collectors.toList()); + final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + return createBucket(aggs, buckets.get(0)); + } + @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java index 70e6b78ae0cbf..45003df5370b1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java @@ -122,17 +122,6 @@ public Aggregations getAggregations() { return FACTORY; } - Bucket reduce(List ranges, ReduceContext context) { - long docCount = 0; - List aggregationsList = new ArrayList<>(ranges.size()); - for (Bucket range : ranges) { - docCount += range.docCount; - aggregationsList.add(range.aggregations); - } - final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); - return getFactory().createBucket(key, from, to, docCount, aggs, keyed, format); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { if (keyed) { @@ -307,25 +296,39 @@ public B createBucket(InternalAggregations aggregations, B prototype) { @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { reduceContext.consumeBucketsAndMaybeBreak(ranges.size()); - List[] rangeList = new List[ranges.size()]; + List[] rangeList = new List[ranges.size()]; for (int i = 0; i < rangeList.length; ++i) { rangeList[i] = new ArrayList<>(); } for (InternalAggregation aggregation : aggregations) { InternalRange ranges = (InternalRange) aggregation; int i = 0; - for (Bucket range : ranges.ranges) { + for (B range : ranges.ranges) { rangeList[i++].add(range); } } final List ranges = new ArrayList<>(); for (int i = 0; i < this.ranges.size(); ++i) { - ranges.add((B) rangeList[i].get(0).reduce(rangeList[i], reduceContext)); + ranges.add((B) reduceBucket(rangeList[i], reduceContext)); } return getFactory().create(name, ranges, format, keyed, pipelineAggregators(), getMetaData()); } + @Override + protected B reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + long docCount = 0; + List aggregationsList = new ArrayList<>(buckets.size()); + for (Bucket bucket : buckets) { + docCount += bucket.docCount; + aggregationsList.add(bucket.aggregations); + } + final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + Bucket prototype = buckets.get(0); + return getFactory().createBucket(prototype.key, prototype.from, prototype.to, docCount, aggs, keyed, format); + } + @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { if (keyed) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java index 5d8bc893a2604..49c2718baaf26 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java @@ -119,21 +119,6 @@ public Aggregations getAggregations() { return aggregations; } - public B reduce(List buckets, ReduceContext context) { - long subsetDf = 0; - long supersetDf = 0; - List aggregationsList = new ArrayList<>(buckets.size()); - for (B bucket : buckets) { - subsetDf += bucket.subsetDf; - supersetDf += bucket.supersetDf; - aggregationsList.add(bucket.aggregations); - } - InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); - return newBucket(subsetDf, subsetSize, supersetDf, supersetSize, aggs); - } - - abstract B newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations); - @Override public double getSignificanceScore() { return score; @@ -229,8 +214,8 @@ public InternalAggregation doReduce(List aggregations, Redu } // Adjust the buckets with the global stats representing the // total size of the pots from which the stats are drawn - existingBuckets.add(bucket.newBucket(bucket.getSubsetDf(), globalSubsetSize, bucket.getSupersetDf(), globalSupersetSize, - bucket.aggregations)); + existingBuckets.add(createBucket(bucket.getSubsetDf(), globalSubsetSize, bucket.getSupersetDf(), globalSupersetSize, + bucket.aggregations, bucket)); } } SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext); @@ -238,7 +223,7 @@ public InternalAggregation doReduce(List aggregations, Redu BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); for (Map.Entry> entry : buckets.entrySet()) { List sameTermBuckets = entry.getValue(); - final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext); + final B b = reduceBucket(sameTermBuckets, reduceContext); b.updateScore(heuristic); if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) { B removed = ordered.insertWithOverflow(b); @@ -258,6 +243,24 @@ public InternalAggregation doReduce(List aggregations, Redu return create(globalSubsetSize, globalSupersetSize, Arrays.asList(list)); } + @Override + protected B reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + long subsetDf = 0; + long supersetDf = 0; + List aggregationsList = new ArrayList<>(buckets.size()); + for (B bucket : buckets) { + subsetDf += bucket.subsetDf; + supersetDf += bucket.supersetDf; + aggregationsList.add(bucket.aggregations); + } + InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + return createBucket(subsetDf, buckets.get(0).subsetSize, supersetDf, buckets.get(0).supersetSize, aggs, buckets.get(0)); + } + + abstract B createBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, + InternalAggregations aggregations, B prototype); + protected abstract A create(long subsetSize, long supersetSize, List buckets); /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java index fd4eec825774e..582346f529a8a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java @@ -86,11 +86,6 @@ public Number getKeyAsNumber() { return term; } - @Override - Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) { - return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, format); - } - @Override protected XContentBuilder keyToXContent(XContentBuilder builder) throws IOException { builder.field(CommonFields.KEY.getPreferredName(), term); @@ -152,4 +147,10 @@ protected SignificantLongTerms create(long subsetSize, long supersetSize, List aggregations, ReduceContext reduceContext) { return new UnmappedSignificantTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index 65e684e315027..8bc0e83c8d6a6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -81,11 +81,6 @@ public int compareKey(Bucket other) { return Double.compare(term, other.term); } - @Override - Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) { - return new Bucket(term, docCount, aggs, showDocCountError, docCountError, format); - } - @Override protected final XContentBuilder keyToXContent(XContentBuilder builder) throws IOException { builder.field(CommonFields.KEY.getPreferredName(), term); @@ -175,4 +170,9 @@ public InternalAggregation doReduce(List aggregations, Redu } return newAggs.get(0).doReduce(newAggs, reduceContext); } + + @Override + Bucket createBucket(long docCount, InternalAggregations aggs, long docCountError, DoubleTerms.Bucket prototype) { + return new Bucket(prototype.term, docCount, aggs, prototype.showDocCountError, docCountError, format); + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 03eb00337e9c1..24c1846bbffbe 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -270,11 +270,6 @@ public Object getKey() { throw new UnsupportedOperationException(); } - @Override - OrdBucket newBucket(long docCount, InternalAggregations aggs, long docCountError) { - throw new UnsupportedOperationException(); - } - @Override public Number getKeyAsNumber() { throw new UnsupportedOperationException(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java index d774d09fa1862..7711946226470 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java @@ -128,7 +128,7 @@ public InternalAggregation doReduce(List aggregations, Redu final List rare = new ArrayList<>(); for (List sameTermBuckets : buckets.values()) { - final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext); + final B b = reduceBucket(sameTermBuckets, reduceContext); if ((b.getDocCount() <= maxDocCount && containsTerm(filter, b) == false)) { rare.add(b); reduceContext.consumeBucketsAndMaybeBreak(1); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java index dd1a0c19200cf..ae9f8e27ec6ae 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java @@ -91,19 +91,6 @@ public Aggregations getAggregations() { return aggregations; } - abstract B newBucket(long docCount, InternalAggregations aggs); - - public B reduce(List buckets, ReduceContext context) { - long docCount = 0; - List aggregationsList = new ArrayList<>(buckets.size()); - for (B bucket : buckets) { - docCount += bucket.docCount; - aggregationsList.add(bucket.aggregations); - } - InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); - return newBucket(docCount, aggs); - } - @Override public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -171,6 +158,21 @@ public InternalAggregation doReduce(List aggregations, Redu throw new UnsupportedOperationException(); } + abstract B createBucket(long docCount, InternalAggregations aggs, B prototype); + + @Override + protected B reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + long docCount = 0; + List aggregationsList = new ArrayList<>(buckets.size()); + for (B bucket : buckets) { + docCount += bucket.docCount; + aggregationsList.add(bucket.aggregations); + } + InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + return createBucket(docCount, aggs, buckets.get(0)); + } + protected abstract A createWithFilter(String name, List buckets, SetBackedScalingCuckooFilter filter); /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index fc607621ff7ed..3eefc9bee0100 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -118,31 +118,6 @@ public Aggregations getAggregations() { return aggregations; } - abstract B newBucket(long docCount, InternalAggregations aggs, long docCountError); - - public B reduce(List buckets, ReduceContext context) { - long docCount = 0; - // For the per term doc count error we add up the errors from the - // shards that did not respond with the term. To do this we add up - // the errors from the shards that did respond with the terms and - // subtract that from the sum of the error from all shards - long docCountError = 0; - List aggregationsList = new ArrayList<>(buckets.size()); - for (B bucket : buckets) { - docCount += bucket.docCount; - if (docCountError != -1) { - if (bucket.docCountError == -1) { - docCountError = -1; - } else { - docCountError += bucket.docCountError; - } - } - aggregationsList.add(bucket.aggregations); - } - InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); - return newBucket(docCount, aggs, docCountError); - } - @Override public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -283,7 +258,7 @@ public InternalAggregation doReduce(List aggregations, Redu final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator(null)); for (List sameTermBuckets : buckets.values()) { - final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext); + final B b = reduceBucket(sameTermBuckets, reduceContext); if (sumDocCountError == -1) { b.docCountError = -1; } else { @@ -314,6 +289,31 @@ public InternalAggregation doReduce(List aggregations, Redu return create(name, Arrays.asList(list), docCountError, otherDocCount); } + @Override + protected B reduceBucket(List buckets, ReduceContext context) { + assert buckets.size() > 0; + long docCount = 0; + // For the per term doc count error we add up the errors from the + // shards that did not respond with the term. To do this we add up + // the errors from the shards that did respond with the terms and + // subtract that from the sum of the error from all shards + long docCountError = 0; + List aggregationsList = new ArrayList<>(buckets.size()); + for (B bucket : buckets) { + docCount += bucket.docCount; + if (docCountError != -1) { + if (bucket.docCountError == -1) { + docCountError = -1; + } else { + docCountError += bucket.docCountError; + } + } + aggregationsList.add(bucket.aggregations); + } + InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + return createBucket(docCount, aggs, docCountError, buckets.get(0)); + } + protected abstract void setDocCountError(long docCountError); protected abstract int getShardSize(); @@ -325,6 +325,8 @@ public InternalAggregation doReduce(List aggregations, Redu */ protected abstract B[] createBucketsArray(int size); + abstract B createBucket(long docCount, InternalAggregations aggs, long docCountError, B prototype); + @Override public boolean equals(Object obj) { if (this == obj) return true; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTerms.java index 29f84fb6030e1..83552b8e078eb 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTerms.java @@ -80,11 +80,6 @@ public int compareKey(Bucket other) { return Long.compare(term, other.term); } - @Override - Bucket newBucket(long docCount, InternalAggregations aggs) { - return new Bucket(term, docCount, aggs, format); - } - @Override protected final XContentBuilder keyToXContent(XContentBuilder builder) throws IOException { builder.field(CommonFields.KEY.getPreferredName(), term); @@ -153,4 +148,9 @@ public boolean containsTerm(SetBackedScalingCuckooFilter filter, LongRareTerms.B public void addToFilter(SetBackedScalingCuckooFilter filter, LongRareTerms.Bucket bucket) { filter.add((long) bucket.getKey()); } + + @Override + Bucket createBucket(long docCount, InternalAggregations aggs, LongRareTerms.Bucket prototype) { + return new Bucket(prototype.term, docCount, aggs, format); + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java index 7857df8713b6e..6a0fcde1fa053 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java @@ -81,11 +81,6 @@ public int compareKey(Bucket other) { return Long.compare(term, other.term); } - @Override - Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) { - return new Bucket(term, docCount, aggs, showDocCountError, docCountError, format); - } - @Override protected final XContentBuilder keyToXContent(XContentBuilder builder) throws IOException { builder.field(CommonFields.KEY.getPreferredName(), term); @@ -158,6 +153,11 @@ public InternalAggregation doReduce(List aggregations, Redu return super.doReduce(aggregations, reduceContext); } + @Override + Bucket createBucket(long docCount, InternalAggregations aggs, long docCountError, LongTerms.Bucket prototype) { + return new Bucket(prototype.term, docCount, aggs, prototype.showDocCountError, docCountError, format); + } + /** * Converts a {@link LongTerms} into a {@link DoubleTerms}, returning the value of the specified long terms as doubles. */ diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTerms.java index 3c3e19664a631..384e20bf69f55 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTerms.java @@ -86,11 +86,6 @@ public int compareKey(Bucket other) { return termBytes.compareTo(other.termBytes); } - @Override - Bucket newBucket(long docCount, InternalAggregations aggs) { - return new Bucket(termBytes, docCount, aggs, format); - } - @Override protected final XContentBuilder keyToXContent(XContentBuilder builder) throws IOException { return builder.field(CommonFields.KEY.getPreferredName(), getKeyAsString()); @@ -156,4 +151,10 @@ public boolean containsTerm(SetBackedScalingCuckooFilter filter, StringRareTerms public void addToFilter(SetBackedScalingCuckooFilter filter, StringRareTerms.Bucket bucket) { filter.add(bucket.termBytes); } + + @Override + Bucket createBucket(long docCount, InternalAggregations aggs, StringRareTerms.Bucket prototype) { + return new Bucket(prototype.termBytes, docCount, aggs, format); + } + } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java index 4971f74f03dc5..afbdf4bc70293 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java @@ -88,11 +88,6 @@ public int compareKey(Bucket other) { return termBytes.compareTo(other.termBytes); } - @Override - Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) { - return new Bucket(termBytes, docCount, aggs, showDocCountError, docCountError, format); - } - @Override protected final XContentBuilder keyToXContent(XContentBuilder builder) throws IOException { return builder.field(CommonFields.KEY.getPreferredName(), getKeyAsString()); @@ -140,6 +135,11 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) prototype.format); } + @Override + Bucket createBucket(long docCount, InternalAggregations aggs, long docCountError, StringTerms.Bucket prototype) { + return new Bucket(prototype.termBytes, docCount, aggs, prototype.showDocCountError, docCountError, format); + } + @Override protected StringTerms create(String name, List buckets, long docCountError, long otherDocCount) { return new StringTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), getMetaData(), format, shardSize, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java index eff5441a1d7e7..c4a019e6fe9b2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java @@ -82,6 +82,11 @@ public UnmappedRareTerms.Bucket createBucket(InternalAggregations aggregations, throw new UnsupportedOperationException("not supported for UnmappedRareTerms"); } + @Override + UnmappedRareTerms.Bucket createBucket(long docCount, InternalAggregations aggs, Bucket prototype) { + throw new UnsupportedOperationException("not supported for UnmappedRareTerms"); + } + @Override protected UnmappedRareTerms createWithFilter(String name, List buckets, SetBackedScalingCuckooFilter filter) { throw new UnsupportedOperationException("not supported for UnmappedRareTerms"); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index 17a3e603b6fcf..8096366f6d655 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -88,6 +88,11 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) throw new UnsupportedOperationException("not supported for UnmappedTerms"); } + @Override + Bucket createBucket(long docCount, InternalAggregations aggs, long docCountError, Bucket prototype) { + throw new UnsupportedOperationException("not supported for UnmappedTerms"); + } + @Override protected UnmappedTerms create(String name, List buckets, long docCountError, long otherDocCount) { throw new UnsupportedOperationException("not supported for UnmappedTerms");