diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java index d0f06c18f915..832b9beede0a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.plan; +import java.util.concurrent.TimeoutException; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.operator.InstanceResponseOperator; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; @@ -45,11 +46,15 @@ public PlanNode getPlanNode() { } @Override - public DataTable execute() { + public DataTable execute() + throws TimeoutException { long startTime = System.currentTimeMillis(); InstanceResponseOperator instanceResponseOperator = _instanceResponsePlanNode.run(); long endTime1 = System.currentTimeMillis(); LOGGER.debug("InstanceResponsePlanNode.run() took: {}ms", endTime1 - startTime); + if (endTime1 > _instanceResponsePlanNode._queryContext.getEndTimeMs()) { + throw new TimeoutException("Query timed out while generating physical execution plan"); + } InstanceResponseBlock instanceResponseBlock = instanceResponseOperator.nextBlock(); long endTime2 = System.currentTimeMillis(); LOGGER.debug("InstanceResponseOperator.nextBlock() took: {}ms", endTime2 - endTime1); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java index 6ea877064d14..00ffe0674ee0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.plan; +import java.util.concurrent.TimeoutException; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.spi.annotations.InterfaceAudience; @@ -32,5 +33,6 @@ public interface Plan { PlanNode getPlanNode(); /** Execute the query plan and get the instance response. */ - DataTable execute(); + DataTable execute() + throws TimeoutException; } diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java index 2ce3d903d703..e73069eb69e9 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.PinotQuery; @@ -196,8 +197,13 @@ private BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery, PlanMaker // Server side serverQueryContext.setEndTimeMs(System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS); Plan plan = planMaker.makeInstancePlan(getIndexSegments(), serverQueryContext, EXECUTOR_SERVICE, null); - DataTable instanceResponse = - queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan) : plan.execute(); + DataTable instanceResponse; + try { + instanceResponse = + queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan) : plan.execute(); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } // Broker side // Use 2 Threads for 2 data-tables @@ -264,10 +270,20 @@ private BrokerResponseNative getBrokerResponseDistinctInstances(PinotQuery pinot Plan plan1 = planMaker.makeInstancePlan(instances.get(0), serverQueryContext, EXECUTOR_SERVICE, null); Plan plan2 = planMaker.makeInstancePlan(instances.get(1), serverQueryContext, EXECUTOR_SERVICE, null); - DataTable instanceResponse1 = - queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan1) : plan1.execute(); - DataTable instanceResponse2 = - queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan2) : plan2.execute(); + DataTable instanceResponse1; + try { + instanceResponse1 = + queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan1) : plan1.execute(); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + DataTable instanceResponse2; + try { + instanceResponse2 = + queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan2) : plan2.execute(); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } // Broker side // Use 2 Threads for 2 data-tables