Skip to content

Commit

Permalink
[test][multistage] enhance query plan test (#11966)
Browse files Browse the repository at this point in the history
Co-authored-by: Rong Rong <[email protected]>
  • Loading branch information
walterddr and Rong Rong authored Nov 8, 2023
1 parent 9092244 commit b5e9823
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,27 +156,23 @@ public StringBuilder visitMailboxReceive(MailboxReceiveNode node, Context contex
MailboxSendNode sender = (MailboxSendNode) node.getSender();
int senderStageId = node.getSenderStageId();
DispatchablePlanFragment dispatchablePlanFragment = _dispatchableSubPlan.getQueryStageList().get(senderStageId);
Map<Integer, Map<String, List<String>>> segments = dispatchablePlanFragment.getWorkerIdToSegmentsMap();

Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap =
dispatchablePlanFragment.getServerInstanceToWorkerIdMap();
Iterator<QueryServerInstance> iterator = serverInstanceToWorkerIdMap.keySet().iterator();
while (iterator.hasNext()) {
QueryServerInstance queryServerInstance = iterator.next();
for (int workerId : serverInstanceToWorkerIdMap.get(queryServerInstance)) {
if (segments.containsKey(workerId)) {
// always print out leaf stages
sender.visit(this, context.next(iterator.hasNext(), queryServerInstance, workerId));
List<Integer> workerIdList = serverInstanceToWorkerIdMap.get(queryServerInstance);
for (int idx = 0; idx < workerIdList.size(); idx++) {
int workerId = workerIdList.get(idx);
if (!iterator.hasNext() && idx == workerIdList.size() - 1) {
// always print out the last one
sender.visit(this, context.next(false, queryServerInstance, workerId));
} else {
if (!iterator.hasNext()) {
// always print out the last one
sender.visit(this, context.next(false, queryServerInstance, workerId));
} else {
// only print short version of the sender node
appendMailboxSend(sender, context.next(true, queryServerInstance, workerId))
.append(" (Subtree Omitted)")
.append('\n');
}
// only print short version of the sender node
appendMailboxSend(sender, context.next(true, queryServerInstance, workerId))
.append(" (Subtree Omitted)")
.append('\n');
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ public void testQueryPlanExplainLogical(String query, String digest)
testQueryPlanExplain(query, digest);
}

@Test(dataProvider = "testQueryPhysicalPlanDataProvider")
public void testQueryPlanExplainPhysical(String query, String digest)
throws Exception {
testQueryPlanExplain(query, digest);
}

private void testQueryPlanExplain(String query, String digest) {
try {
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
Expand Down Expand Up @@ -466,49 +460,4 @@ private Object[][] provideQueriesWithExplainedLogicalPlan() {
};
//@formatter:on
}

@DataProvider(name = "testQueryPhysicalPlanDataProvider")
private Object[][] provideQueriesWithExplainedPhysicalPlan() {
//@formatter:off
return new Object[][] {
new Object[]{"EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a",
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ "│ └── [1]@localhost:1 PROJECT\n"
+ "│ └── [1]@localhost:1 TABLE SCAN (a) null\n"
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ " └── [1]@localhost:2 PROJECT\n"
+ " └── [1]@localhost:2 TABLE SCAN (a) null\n"},
new Object[]{"EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ " └── [1]@localhost:2 AGGREGATE_FINAL\n"
+ " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+ " ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ " │ └── [2]@localhost:1 AGGREGATE_LEAF\n"
+ " │ └── [2]@localhost:1 TABLE SCAN (a) null\n"
+ " └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ " └── [2]@localhost:2 AGGREGATE_LEAF\n"
+ " └── [2]@localhost:2 TABLE SCAN (a) null\n"},
new Object[]{"EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n"
+ "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n"
+ "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n"
+ " └── [1]@localhost:2 PROJECT\n"
+ " └── [1]@localhost:2 JOIN\n"
+ " ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+ " │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ " │ │ └── [2]@localhost:1 PROJECT\n"
+ " │ │ └── [2]@localhost:1 TABLE SCAN (a) null\n"
+ " │ └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ " │ └── [2]@localhost:2 PROJECT\n"
+ " │ └── [2]@localhost:2 TABLE SCAN (a) null\n"
+ " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n"
+ " └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n"
+ " └── [3]@localhost:1 PROJECT\n"
+ " └── [3]@localhost:1 TABLE SCAN (b) null\n"}
};
//@formatter:on
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -57,6 +58,23 @@ public class QueryEnvironmentTestBase {
"d_REALTIME", ImmutableList.of("d2"), "d_OFFLINE", ImmutableList.of("d3"), "e_REALTIME",
ImmutableList.of("e2"), "e_OFFLINE", ImmutableList.of("e3"));
public static final Map<String, Schema> TABLE_SCHEMAS = new HashMap<>();
public static final Map<String, Pair<String, List<List<String>>>> PARTITIONED_SEGMENTS_MAP = new HashMap<>();
public static final int PARTITION_COUNT = 4;
public static final Map<String, String> PARTITIONED_TABLES =
ImmutableMap.of("a_REALTIME", "col2", "b_REALTIME", "col1");
static {
for (Map.Entry<String, String> e : PARTITIONED_TABLES.entrySet()) {
String tableName = e.getKey();
String partitionColumn = e.getValue();
List<List<String>> partitionIdToSegmentsMap = new ArrayList<>(PARTITION_COUNT);
partitionIdToSegmentsMap.add(SERVER1_SEGMENTS.getOrDefault(tableName, Collections.emptyList()));
partitionIdToSegmentsMap.add(SERVER2_SEGMENTS.getOrDefault(tableName, Collections.emptyList()));
for (int i = 2; i < PARTITION_COUNT; i++) {
partitionIdToSegmentsMap.add(new ArrayList<>());
}
PARTITIONED_SEGMENTS_MAP.put(tableName, Pair.of(partitionColumn, partitionIdToSegmentsMap));
}
}

static {
TABLE_SCHEMAS.put("a_REALTIME", getSchemaBuilder("a").build());
Expand Down Expand Up @@ -84,7 +102,8 @@ static Schema.SchemaBuilder getSchemaBuilder(String schemaName) {
@BeforeClass
public void setUp() {
// the port doesn't matter as we are not actually making a server call.
_queryEnvironment = getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS, SERVER1_SEGMENTS, SERVER2_SEGMENTS, null);
_queryEnvironment =
getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS, SERVER1_SEGMENTS, SERVER2_SEGMENTS, PARTITIONED_SEGMENTS_MAP);
}

@DataProvider(name = "testQueryDataProvider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String EXPLAIN_REGEX =
"EXPLAIN (IMPLEMENTATION )*PLAN (INCLUDING |EXCLUDING )*(ALL )*(ATTRIBUTES )*(AS DOT |AS JSON |AS TEXT )*FOR ";
private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";

Expand All @@ -51,7 +53,8 @@ public void testQueryExplainPlansAndQueryPlanConversion(String testCaseName, Str
String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
Assert.assertEquals(explainedPlan, output,
String.format("Test case %s for query %s doesn't match expected output: %s", testCaseName, query, output));
String queryWithoutExplainPlan = query.replace("EXPLAIN PLAN FOR ", "");
// use a regex to exclude the
String queryWithoutExplainPlan = query.replaceFirst(EXPLAIN_REGEX, "");
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(queryWithoutExplainPlan);
Assert.assertNotNull(dispatchableSubPlan,
String.format("Test case %s for query %s should not have a null QueryPlan",
Expand All @@ -66,7 +69,7 @@ public void testQueryExplainPlansWithExceptions(String testCaseName, String quer
try {
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
_queryEnvironment.explainQuery(query, requestId);
String queryWithoutExplainPlan = query.replace("EXPLAIN PLAN FOR ", "");
String queryWithoutExplainPlan = query.replaceFirst(EXPLAIN_REGEX, "");
_queryEnvironment.planQuery(queryWithoutExplainPlan);
Assert.fail("Query compilation should have failed with exception message pattern: " + expectedException);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{
"physical_plan_explain_formats": {
"queries": [
{
"description": "explain plan with attributes",
"sql": "EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a",
"output": [
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
"├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
" └── [1]@localhost:2 PROJECT\n",
" └── [1]@localhost:2 TABLE SCAN (a) null\n"
]
},
{
"description": "explain plan without attributes",
"sql": "EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
"output": [
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
"├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
" └── [1]@localhost:2 AGGREGATE_FINAL\n",
" └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
" └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
" └── [2]@localhost:2 AGGREGATE_LEAF\n",
" └── [2]@localhost:2 TABLE SCAN (a) null\n"
]
},
{
"description": "explain plan with join",
"sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
"output": [
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
"├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
" └── [1]@localhost:2 PROJECT\n",
" └── [1]@localhost:2 JOIN\n",
" ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
" │ └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
" │ └── [2]@localhost:2 PROJECT\n",
" │ └── [2]@localhost:2 TABLE SCAN (a) null\n",
" └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
" └── [3]@localhost:1 PROJECT\n",
" └── [3]@localhost:1 TABLE SCAN (b) null\n"
]
},
{
"description": "explain plan with join with colocated tables",
"sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0",
"output": [
"[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
"├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
"└── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
" └── [1]@localhost:1 PROJECT\n",
" └── [1]@localhost:1 JOIN\n",
" ├── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" │ ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" │ ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" │ └── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
" │ └── [2]@localhost:1 PROJECT\n",
" │ └── [2]@localhost:1 TABLE SCAN (a) null\n",
" └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" ├── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
" └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
" └── [3]@localhost:1 PROJECT\n",
" └── [3]@localhost:1 FILTER\n",
" └── [3]@localhost:1 TABLE SCAN (b) null\n"
]
}
]
}
}

0 comments on commit b5e9823

Please sign in to comment.