From 2c7168d7e464902f09ff204ca4f4da48c79431c9 Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Thu, 29 Sep 2022 15:10:16 -0700 Subject: [PATCH 1/7] add query interruption flag check to broker groupby reduction --- .../broker/requesthandler/BaseBrokerRequestHandler.java | 6 +++--- .../pinot/core/query/reduce/GroupByDataTableReducer.java | 5 +++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 7a71a59a639b..5b27b2da4ee4 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -694,9 +694,9 @@ private BrokerResponseNative handleRequest(long requestId, String query, LOGGER.debug("Remove track of running query: {}", requestId); } } else { - brokerResponse = - processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable, - realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestContext); + brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, + offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, + requestContext); } brokerResponse.setExceptions(exceptions); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 1e2d6c18b07b..37ca13a56c23 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -55,6 +55,7 @@ import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.core.util.GroupByUtils; import org.apache.pinot.core.util.trace.TraceRunnable; +import org.apache.pinot.spi.exception.EarlyTerminationException; import org.roaringbitmap.RoaringBitmap; @@ -64,6 +65,7 @@ @SuppressWarnings({"rawtypes", "unchecked"}) public class GroupByDataTableReducer implements DataTableReducer { private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, find a better value. + private static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK = 10_000; private final QueryContext _queryContext; private final AggregationFunction[] _aggregationFunctions; @@ -289,6 +291,9 @@ public void runJob() { int numRows = dataTable.getNumberOfRows(); for (int rowId = 0; rowId < numRows; rowId++) { + if (rowId % MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK == 0 && Thread.interrupted()) { + throw new EarlyTerminationException(); + } Object[] values = new Object[_numColumns]; for (int colId = 0; colId < _numColumns; colId++) { switch (storedColumnDataTypes[colId]) { From 8bd5524af7041a47653dda220a38eef9c0efb912 Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Thu, 29 Sep 2022 16:38:07 -0700 Subject: [PATCH 2/7] add query interruption flag check to broker groupby reduction --- .../apache/pinot/core/query/reduce/GroupByDataTableReducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 37ca13a56c23..1c5e21a9423c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -292,7 +292,7 @@ public void runJob() { int numRows = dataTable.getNumberOfRows(); for (int rowId = 0; rowId < numRows; rowId++) { if (rowId % MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK == 0 && Thread.interrupted()) { - throw new EarlyTerminationException(); + return; } Object[] values = new Object[_numColumns]; for (int colId = 0; colId < _numColumns; colId++) { From ad6621d775f5241d00a622da62d83ecca745b0ec Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Thu, 29 Sep 2022 17:07:48 -0700 Subject: [PATCH 3/7] add query interruption flag check to broker groupby reduction --- .../apache/pinot/core/query/reduce/GroupByDataTableReducer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 1c5e21a9423c..dd0247696f64 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -55,7 +55,6 @@ import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.core.util.GroupByUtils; import org.apache.pinot.core.util.trace.TraceRunnable; -import org.apache.pinot.spi.exception.EarlyTerminationException; import org.roaringbitmap.RoaringBitmap; From e421800e806fff19982c31181ab945017e540b0a Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Thu, 29 Sep 2022 19:25:25 -0700 Subject: [PATCH 4/7] add benchmark --- .../query/reduce/GroupByDataTableReducer.java | 4 +- .../BenchmarkThreadInterruptionCheck.java | 68 +++++++++++++++++++ 2 files changed, 70 insertions(+), 2 deletions(-) create mode 100644 pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index dd0247696f64..a00fcb165f20 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -64,7 +64,7 @@ @SuppressWarnings({"rawtypes", "unchecked"}) public class GroupByDataTableReducer implements DataTableReducer { private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, find a better value. - private static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK = 10_000; + private static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK = 0b1111_11111_11111; // checks every 32768 rows private final QueryContext _queryContext; private final AggregationFunction[] _aggregationFunctions; @@ -290,7 +290,7 @@ public void runJob() { int numRows = dataTable.getNumberOfRows(); for (int rowId = 0; rowId < numRows; rowId++) { - if (rowId % MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK == 0 && Thread.interrupted()) { + if ((rowId & MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK) == 0 && Thread.interrupted()) { return; } Object[] values = new Object[_numColumns]; diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java new file mode 100644 index 000000000000..e8e5aa3810c1 --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; + + +@State(Scope.Benchmark) +public class BenchmarkThreadInterruptionCheck { + + static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK = 0b1111_11111_11111; + + public static void main(String[] args) + throws RunnerException { + Options opt = + new OptionsBuilder().include(BenchmarkThreadInterruptionCheck.class.getSimpleName()) + .warmupTime(TimeValue.seconds(5)) + .warmupIterations(3).measurementTime(TimeValue.seconds(5)).measurementIterations(5).forks(1).build(); + + new Runner(opt).run(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void benchMaskingTime(Blackhole bh) { + for (int i = 0; i < 1000; i++) { + bh.consume((i & MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK) == 0); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void benchInterruptionCheckTime(Blackhole bh) { + for (int i = 0; i < 1000; i++) { + bh.consume(Thread.interrupted()); + } + } +} From 352cedbe3505b5e3025b0385e82a8c256bcc8e5f Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Thu, 29 Sep 2022 21:26:40 -0700 Subject: [PATCH 5/7] tiled loop --- .../query/reduce/GroupByDataTableReducer.java | 89 ++++++++++--------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index a00fcb165f20..6784f7789fe8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -64,7 +64,7 @@ @SuppressWarnings({"rawtypes", "unchecked"}) public class GroupByDataTableReducer implements DataTableReducer { private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, find a better value. - private static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK = 0b1111_11111_11111; // checks every 32768 rows + private static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK = 10_000; private final QueryContext _queryContext; private final AggregationFunction[] _aggregationFunctions; @@ -289,54 +289,57 @@ public void runJob() { } int numRows = dataTable.getNumberOfRows(); - for (int rowId = 0; rowId < numRows; rowId++) { - if ((rowId & MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK) == 0 && Thread.interrupted()) { + for (int rowIdBatch = 0; rowIdBatch < numRows; rowIdBatch += MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK) { + if (Thread.interrupted()) { return; } - Object[] values = new Object[_numColumns]; - for (int colId = 0; colId < _numColumns; colId++) { - switch (storedColumnDataTypes[colId]) { - case INT: - values[colId] = dataTable.getInt(rowId, colId); - break; - case LONG: - values[colId] = dataTable.getLong(rowId, colId); - break; - case FLOAT: - values[colId] = dataTable.getFloat(rowId, colId); - break; - case DOUBLE: - values[colId] = dataTable.getDouble(rowId, colId); - break; - case BIG_DECIMAL: - values[colId] = dataTable.getBigDecimal(rowId, colId); - break; - case STRING: - values[colId] = dataTable.getString(rowId, colId); - break; - case BYTES: - values[colId] = dataTable.getBytes(rowId, colId); - break; - case OBJECT: - // TODO: Move ser/de into AggregationFunction interface - DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, colId); - if (customObject != null) { - values[colId] = ObjectSerDeUtils.deserialize(customObject); - } - break; - // Add other aggregation intermediate result / group-by column type supports here - default: - throw new IllegalStateException(); - } - } - if (nullHandlingEnabled) { + int upper = Math.min(rowIdBatch + MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK, numRows); + for (int rowId = rowIdBatch; rowId < upper; rowId++) { + Object[] values = new Object[_numColumns]; for (int colId = 0; colId < _numColumns; colId++) { - if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) { - values[colId] = null; + switch (storedColumnDataTypes[colId]) { + case INT: + values[colId] = dataTable.getInt(rowId, colId); + break; + case LONG: + values[colId] = dataTable.getLong(rowId, colId); + break; + case FLOAT: + values[colId] = dataTable.getFloat(rowId, colId); + break; + case DOUBLE: + values[colId] = dataTable.getDouble(rowId, colId); + break; + case BIG_DECIMAL: + values[colId] = dataTable.getBigDecimal(rowId, colId); + break; + case STRING: + values[colId] = dataTable.getString(rowId, colId); + break; + case BYTES: + values[colId] = dataTable.getBytes(rowId, colId); + break; + case OBJECT: + // TODO: Move ser/de into AggregationFunction interface + DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, colId); + if (customObject != null) { + values[colId] = ObjectSerDeUtils.deserialize(customObject); + } + break; + // Add other aggregation intermediate result / group-by column type supports here + default: + throw new IllegalStateException(); + } + } + if (nullHandlingEnabled) { + for (int colId = 0; colId < _numColumns; colId++) { + if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) { + values[colId] = null; + } } } + indexedTable.upsert(new Record(values)); } - indexedTable.upsert(new Record(values)); } } finally { countDownLatch.countDown(); From d8baa3c6edc6206cf32cbe95c80f8f08d5d353ea Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Thu, 29 Sep 2022 21:51:55 -0700 Subject: [PATCH 6/7] add benchmark --- .../BenchmarkThreadInterruptionCheck.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java index e8e5aa3810c1..ea4869441945 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java @@ -36,7 +36,7 @@ @State(Scope.Benchmark) public class BenchmarkThreadInterruptionCheck { - static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK = 0b1111_11111_11111; + static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK = 0b111_11111_11111; public static void main(String[] args) throws RunnerException { @@ -52,16 +52,34 @@ public static void main(String[] args) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void benchMaskingTime(Blackhole bh) { - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000000; i++) { bh.consume((i & MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK) == 0); } } + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void benchModuloTime(Blackhole bh) { + for (int i = 0; i < 1000000; i++) { + bh.consume((i % 16321) == 0); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void benchLoopTilingTime(Blackhole bh) { + for (int i = 0; i < 1000000; i += 16321) { + bh.consume(Math.min(i + 16321, 1000000)); + } + } + @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void benchInterruptionCheckTime(Blackhole bh) { - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000000; i++) { bh.consume(Thread.interrupted()); } } From 3ecdb3a2fbec2a433dcc65c9ba4b2f9ee918980d Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Thu, 29 Sep 2022 23:43:04 -0700 Subject: [PATCH 7/7] Trigger Test