diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/EmptyQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/EmptyQuickstart.java new file mode 100644 index 000000000000..150ae58bf08b --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/EmptyQuickstart.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.pinot.tools.admin.PinotAdministrator; +import org.apache.pinot.tools.admin.command.QuickstartRunner; + + +public class EmptyQuickstart extends QuickStartBase { + @Override + public List types() { + return Arrays.asList("EMPTY", "DEFAULT"); + } + + public String getAuthToken() { + return null; + } + + public Map getConfigOverrides() { + return null; + } + + public void execute() + throws Exception { + File quickstartTmpDir = new File(_dataDir.getAbsolutePath()); + File dataDir = new File(quickstartTmpDir, "rawdata"); + if (!dataDir.mkdirs()) { + printStatus(Quickstart.Color.YELLOW, "***** Bootstrapping data from existing directory *****"); + } else { + printStatus(Quickstart.Color.YELLOW, "***** Creating new data directory for fresh installation *****"); + } + + QuickstartRunner runner = + new QuickstartRunner(new ArrayList<>(), 1, 1, 1, 0, + dataDir, true, getAuthToken(), getConfigOverrides(), _zkExternalAddress, false); + + if (_zkExternalAddress != null) { + printStatus(Quickstart.Color.CYAN, "***** Starting controller, broker and server *****"); + } else { + printStatus(Quickstart.Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****"); + } + + runner.startAll(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + printStatus(Quickstart.Color.GREEN, "***** Shutting down empty quick start *****"); + runner.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + })); + + waitForBootstrapToComplete(runner); + + printStatus(Quickstart.Color.YELLOW, "***** Empty quickstart setup complete *****"); + printStatus(Quickstart.Color.GREEN, + "You can always go to http://localhost:9000 to play around in the query console"); + } + + public static void main(String[] args) + throws Exception { + List arguments = new ArrayList<>(); + arguments.addAll(Arrays.asList("QuickStart", "-type", "EMPTY")); + arguments.addAll(Arrays.asList(args)); + PinotAdministrator.main(arguments.toArray(new String[arguments.size()])); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java index ce67a0729d66..536dad4bdf18 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/GenericQuickstart.java @@ -86,7 +86,7 @@ public void execute() File tempDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis())); Preconditions.checkState(tempDir.mkdirs()); QuickstartTableRequest request = new QuickstartTableRequest(_tableDirectory.getAbsolutePath()); - final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, tempDir); + final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir); printStatus(Color.CYAN, "***** Starting Kafka *****"); startKafka(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java index 4d20a0a6a3fa..a2e5615ddd6c 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java @@ -84,7 +84,7 @@ public void execute(String personalAccessToken) File tempDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis())); Preconditions.checkState(tempDir.mkdirs()); QuickstartTableRequest request = new QuickstartTableRequest(quickStartDataDir.getAbsolutePath()); - final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, tempDir); + final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir); printStatus(Color.CYAN, "***** Starting Kafka *****"); startKafka(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java index 39209f07ff9c..45893af5c9d7 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java @@ -41,7 +41,6 @@ import org.apache.pinot.tools.utils.KafkaStarterUtils; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; -import static org.apache.pinot.tools.Quickstart.printStatus; public class HybridQuickstart extends QuickStartBase { @@ -110,12 +109,13 @@ private void startKafka() { public void execute() throws Exception { - File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis())); + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); File baseDir = new File(quickstartTmpDir, "airlineStats"); File dataDir = new File(baseDir, "data"); Preconditions.checkState(dataDir.mkdirs()); QuickstartTableRequest bootstrapTableRequest = prepareTableRequest(baseDir); - final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(bootstrapTableRequest), 1, 1, 1, dataDir); + final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(bootstrapTableRequest), + 1, 1, 1, 0, dataDir); printStatus(Color.YELLOW, "***** Starting Kafka *****"); startKafka(); printStatus(Color.YELLOW, "***** Starting airline data stream and publishing to Kafka *****"); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java index 359ff0f1a23e..f4d9ea7469bb 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java @@ -31,7 +31,6 @@ import org.apache.pinot.tools.admin.command.QuickstartRunner; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; -import static org.apache.pinot.tools.Quickstart.printStatus; public class JoinQuickStart extends QuickStartBase { @@ -43,7 +42,7 @@ public List types() { public void execute() throws Exception { - File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis())); + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); // Baseball stat table File baseBallStatsBaseDir = new File(quickstartTmpDir, "baseballStats"); @@ -81,7 +80,7 @@ public void execute() File tempDir = new File(quickstartTmpDir, "tmp"); FileUtils.forceMkdir(tempDir); - QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request, dimTableRequest), 1, 1, 3, tempDir); + QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request, dimTableRequest), 1, 1, 3, 0, tempDir); printStatus(Quickstart.Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****"); runner.startAll(); @@ -97,9 +96,7 @@ public void execute() printStatus(Quickstart.Color.CYAN, "***** Bootstrap baseballStats table *****"); runner.bootstrapTable(); - printStatus(Quickstart.Color.CYAN, - "***** Waiting for 5 seconds for the server to fetch the assigned segment *****"); - Thread.sleep(5000); + waitForBootstrapToComplete(null); printStatus(Quickstart.Color.YELLOW, "***** Offline quickstart setup complete *****"); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/JsonIndexQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/JsonIndexQuickStart.java index 4abb09e6da5e..36ba63d6ff76 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/JsonIndexQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/JsonIndexQuickStart.java @@ -31,7 +31,6 @@ import org.apache.pinot.tools.admin.command.QuickstartRunner; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; -import static org.apache.pinot.tools.Quickstart.printStatus; public class JsonIndexQuickStart extends QuickStartBase { @@ -43,7 +42,7 @@ public List types() { public void execute() throws Exception { - File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis())); + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); File baseDir = new File(quickstartTmpDir, "githubEvents"); File dataDir = new File(quickstartTmpDir, "rawdata"); Preconditions.checkState(dataDir.mkdirs()); @@ -64,7 +63,7 @@ public void execute() FileUtils.copyURLToFile(resource, ingestionJobSpecFile); QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); - final QuickstartRunner runner = new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, dataDir); + final QuickstartRunner runner = new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, 0, dataDir); printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****"); runner.startAll(); @@ -80,8 +79,7 @@ public void execute() printStatus(Color.CYAN, "***** Bootstrap githubEvents table *****"); runner.bootstrapTable(); - printStatus(Color.CYAN, "***** Waiting for 5 seconds for the server to fetch the assigned segment *****"); - Thread.sleep(5000); + waitForBootstrapToComplete(null); printStatus(Color.YELLOW, "***** Offline json-index quickstart setup complete *****"); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/OfflineComplexTypeHandlingQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/OfflineComplexTypeHandlingQuickStart.java index ccd70be7271e..ed3eb618b2f3 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/OfflineComplexTypeHandlingQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/OfflineComplexTypeHandlingQuickStart.java @@ -31,7 +31,6 @@ import org.apache.pinot.tools.admin.command.QuickstartRunner; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; -import static org.apache.pinot.tools.Quickstart.printStatus; public class OfflineComplexTypeHandlingQuickStart extends QuickStartBase { @@ -42,7 +41,7 @@ public List types() { public void execute() throws Exception { - File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis())); + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); File baseDir = new File(quickstartTmpDir, "githubEvents"); File dataDir = new File(quickstartTmpDir, "rawdata"); Preconditions.checkState(dataDir.mkdirs()); @@ -66,7 +65,7 @@ public void execute() FileUtils.copyURLToFile(resource, ingestionJobSpecFile); QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); - final QuickstartRunner runner = new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, dataDir); + final QuickstartRunner runner = new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, 0, dataDir); printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****"); runner.startAll(); @@ -82,8 +81,7 @@ public void execute() printStatus(Color.CYAN, "***** Bootstrap githubEvents table *****"); runner.bootstrapTable(); - printStatus(Color.CYAN, "***** Waiting for 5 seconds for the server to fetch the assigned segment *****"); - Thread.sleep(5000); + waitForBootstrapToComplete(null); printStatus(Color.YELLOW, "***** Offline complex-type-handling quickstart setup complete *****"); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java index 69baa9bb0bba..16159017c5eb 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java @@ -66,7 +66,7 @@ public void execute() FileUtils.copyURLToFile(resource, tableConfigFile); QuickstartTableRequest request = new QuickstartTableRequest(bootstrapTableDir.getAbsolutePath()); - final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir); + final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir); printStatus(Color.CYAN, "***** Starting Kafka *****"); final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java index 634336301add..6b2686ab821c 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java @@ -21,18 +21,36 @@ import java.io.File; import java.util.List; import org.apache.commons.io.FileUtils; +import org.apache.pinot.tools.admin.command.QuickstartRunner; public abstract class QuickStartBase { - protected File _tmpDir = FileUtils.getTempDirectory(); + protected File _dataDir = FileUtils.getTempDirectory(); + protected String _zkExternalAddress; - public QuickStartBase setTmpDir(String tmpDir) { - _tmpDir = new File(tmpDir); + public QuickStartBase setDataDir(String dataDir) { + _dataDir = new File(dataDir); + return this; + } + + public QuickStartBase setZkExternalAddress(String zkExternalAddress) { + _zkExternalAddress = zkExternalAddress; return this; } public abstract List types(); + protected void waitForBootstrapToComplete(QuickstartRunner runner) + throws Exception { + QuickStartBase.printStatus(Quickstart.Color.CYAN, + "***** Waiting for 5 seconds for the server to fetch the assigned segment *****"); + Thread.sleep(5000); + } + + public static void printStatus(Quickstart.Color color, String message) { + System.out.println(color.getCode() + message + Quickstart.Color.RESET.getCode()); + } + public abstract void execute() throws Exception; } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java index 25c5985f6ddc..886085a82d67 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java @@ -44,7 +44,11 @@ public List types() { public enum Color { RESET("\u001B[0m"), GREEN("\u001B[32m"), YELLOW("\u001B[33m"), CYAN("\u001B[36m"); - private String _code; + private final String _code; + + public String getCode() { + return _code; + } Color(String code) { _code = code; @@ -67,16 +71,6 @@ public Map getConfigOverrides() { return null; } - protected void waitForBootstrapToComplete(QuickstartRunner runner) - throws Exception { - printStatus(Color.CYAN, "***** Waiting for 5 seconds for the server to fetch the assigned segment *****"); - Thread.sleep(5000); - } - - public static void printStatus(Color color, String message) { - System.out.println(color._code + message + Color.RESET._code); - } - public static String prettyPrintResponse(JsonNode response) { StringBuilder responseBuilder = new StringBuilder(); @@ -163,7 +157,7 @@ public static String prettyPrintResponse(JsonNode response) { public void execute() throws Exception { - File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis())); + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); File baseDir = new File(quickstartTmpDir, "baseballStats"); File dataDir = new File(baseDir, "rawdata"); Preconditions.checkState(dataDir.mkdirs()); @@ -190,8 +184,9 @@ public void execute() QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); QuickstartRunner runner = - new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, getNumMinions(), dataDir, true, getAuthToken(), - getConfigOverrides()); + new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, + getNumMinions(), dataDir, true, getAuthToken(), + getConfigOverrides(), null, true); printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****"); runner.startAll(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java index 626c39e30ea5..3e45e9d10d79 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java @@ -36,7 +36,6 @@ import org.apache.pinot.tools.utils.KafkaStarterUtils; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; -import static org.apache.pinot.tools.Quickstart.printStatus; public class RealtimeComplexTypeHandlingQuickStart extends QuickStartBase { @@ -57,7 +56,7 @@ public static void main(String[] args) public void execute() throws Exception { - File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis())); + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); File baseDir = new File(quickstartTmpDir, "meetupRsvp"); File dataDir = new File(baseDir, "data"); Preconditions.checkState(dataDir.mkdirs()); @@ -75,7 +74,7 @@ public void execute() FileUtils.copyURLToFile(resource, tableConfigFile); QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); - QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir); + QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir); printStatus(Color.CYAN, "***** Starting Kafka *****"); ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java index 4bfc4fe8aa54..758222d4e0f1 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java @@ -36,7 +36,6 @@ import org.apache.pinot.tools.utils.KafkaStarterUtils; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; -import static org.apache.pinot.tools.Quickstart.printStatus; public class RealtimeJsonIndexQuickStart extends QuickStartBase { @@ -57,7 +56,7 @@ public static void main(String[] args) public void execute() throws Exception { - File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis())); + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); File baseDir = new File(quickstartTmpDir, "meetupRsvp"); File dataDir = new File(baseDir, "data"); Preconditions.checkState(dataDir.mkdirs()); @@ -74,7 +73,7 @@ public void execute() FileUtils.copyURLToFile(resource, tableConfigFile); QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); - QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir); + QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir); printStatus(Color.CYAN, "***** Starting Kafka *****"); ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java index 08301855df46..bcceaf8901a0 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java @@ -36,7 +36,6 @@ import org.apache.pinot.tools.utils.KafkaStarterUtils; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; -import static org.apache.pinot.tools.Quickstart.printStatus; public class RealtimeQuickStart extends QuickStartBase { @@ -57,7 +56,7 @@ public static void main(String[] args) public void execute() throws Exception { - File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis())); + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); File baseDir = new File(quickstartTmpDir, "meetupRsvp"); File dataDir = new File(baseDir, "rawdata"); Preconditions.checkState(dataDir.mkdirs()); @@ -74,7 +73,7 @@ public void execute() FileUtils.copyURLToFile(resource, tableConfigFile); QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); - final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir); + final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir); printStatus(Color.CYAN, "***** Starting Kafka *****"); final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java index 7b24580ee5b9..c1240334c067 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStartWithMinion.java @@ -41,7 +41,6 @@ import org.apache.pinot.tools.utils.KafkaStarterUtils; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; -import static org.apache.pinot.tools.Quickstart.printStatus; /** @@ -72,7 +71,7 @@ public Map getConfigOverrides() { public void execute() throws Exception { - File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis())); + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); File baseDir = new File(quickstartTmpDir, "githubEvents"); File dataDir = new File(baseDir, "rawdata"); Preconditions.checkState(dataDir.mkdirs()); @@ -99,7 +98,8 @@ public void execute() QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); QuickstartRunner runner = - new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, dataDir, true, null, getConfigOverrides()); + new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, + dataDir, true, null, getConfigOverrides(), null, true); printStatus(Color.CYAN, "***** Starting Kafka *****"); final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java index 9dcd169253ad..1cfe36837892 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java @@ -36,7 +36,6 @@ import org.apache.pinot.tools.utils.KafkaStarterUtils; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; -import static org.apache.pinot.tools.Quickstart.printStatus; public class UpsertJsonQuickStart extends QuickStartBase { @@ -57,7 +56,7 @@ public static void main(String[] args) public void execute() throws Exception { - File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis())); + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); File baseDir = new File(quickstartTmpDir, "meetupRsvp"); File dataDir = new File(baseDir, "data"); Preconditions.checkState(dataDir.mkdirs()); @@ -74,7 +73,7 @@ public void execute() FileUtils.copyURLToFile(resource, tableConfigFile); QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath()); - QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir); + QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir); printStatus(Color.CYAN, "***** Starting Kafka *****"); ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java index ebc5d3fc79ac..a80ea92aa9b6 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertQuickStart.java @@ -37,7 +37,6 @@ import org.apache.pinot.tools.utils.KafkaStarterUtils; import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; -import static org.apache.pinot.tools.Quickstart.printStatus; public class UpsertQuickStart extends QuickStartBase { @@ -58,7 +57,7 @@ public static void main(String[] args) public void execute() throws Exception { - File quickstartTmpDir = new File(_tmpDir, String.valueOf(System.currentTimeMillis())); + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); File bootstrapTableDir = new File(quickstartTmpDir, "meetupRsvp"); File dataDir = new File(bootstrapTableDir, "data"); Preconditions.checkState(dataDir.mkdirs()); @@ -75,7 +74,7 @@ public void execute() FileUtils.copyURLToFile(resource, tableConfigFile); QuickstartTableRequest request = new QuickstartTableRequest(bootstrapTableDir.getAbsolutePath()); - final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, dataDir); + final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir); printStatus(Color.CYAN, "***** Starting Kafka *****"); final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java index a4b6111b91b4..43927a4d399f 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickStartCommand.java @@ -43,6 +43,10 @@ public class QuickStartCommand extends AbstractBaseAdminCommand implements Comma description = "Temp Directory to host quickstart data") private String _tmpDir; + @CommandLine.Option(names = {"-zkAddress", "-zkUrl", "-zkExternalAddress"}, required = false, + description = "URL for an external Zookeeper instance instead of using the default embedded instance") + private String _zkExternalAddress; + @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, description = "Print this message.") private boolean _help = false; @@ -70,6 +74,14 @@ public void setTmpDir(String tmpDir) { _tmpDir = tmpDir; } + public String getZkExternalAddress() { + return _zkExternalAddress; + } + + public void setZkExternalAddress(String zkExternalAddress) { + _zkExternalAddress = zkExternalAddress; + } + @Override public String toString() { return ("QuickStart -type " + _type); @@ -111,8 +123,13 @@ public boolean execute() throws Exception { QuickStartBase quickstart = selectQuickStart(_type); if (_tmpDir != null) { - quickstart.setTmpDir(_tmpDir); + quickstart.setDataDir(_tmpDir); + } + + if (_zkExternalAddress != null) { + quickstart.setZkExternalAddress(_zkExternalAddress); } + quickstart.execute(); return true; } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java index 8d602445a383..7387e42c177d 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java @@ -75,14 +75,24 @@ public class QuickstartRunner { private final boolean _enableTenantIsolation; private final String _authToken; private final Map _configOverrides; + private final boolean _deleteExistingData; + + // If this field is non-null, an embedded Zookeeper instance will not be launched + private final String _zkExternalAddress; private final List _controllerPorts = new ArrayList<>(); private final List _brokerPorts = new ArrayList<>(); private boolean _isStopped = false; + public QuickstartRunner(List tableRequests, int numControllers, int numBrokers, + int numServers, int numMinions, File tempDir) + throws Exception { + this(tableRequests, numControllers, numBrokers, numServers, numMinions, tempDir, true, null, null, null, true); + } + public QuickstartRunner(List tableRequests, int numControllers, int numBrokers, int numServers, int numMinions, File tempDir, boolean enableIsolation, String authToken, - Map configOverrides) + Map configOverrides, String zkExternalAddress, boolean deleteExistingData) throws Exception { _tableRequests = tableRequests; _numControllers = numControllers; @@ -93,13 +103,11 @@ public QuickstartRunner(List tableRequests, int numContr _enableTenantIsolation = enableIsolation; _authToken = authToken; _configOverrides = configOverrides; - clean(); - } - - public QuickstartRunner(List tableRequests, int numControllers, int numBrokers, - int numServers, File tempDir) - throws Exception { - this(tableRequests, numControllers, numBrokers, numServers, 0, tempDir, true, null, null); + _zkExternalAddress = zkExternalAddress; + _deleteExistingData = deleteExistingData; + if (deleteExistingData) { + clean(); + } } private void startZookeeper() @@ -114,7 +122,8 @@ private void startControllers() throws Exception { for (int i = 0; i < _numControllers; i++) { StartControllerCommand controllerStarter = new StartControllerCommand(); - controllerStarter.setControllerPort(String.valueOf(DEFAULT_CONTROLLER_PORT + i)).setZkAddress(ZK_ADDRESS) + controllerStarter.setControllerPort(String.valueOf(DEFAULT_CONTROLLER_PORT + i)) + .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS) .setClusterName(CLUSTER_NAME).setTenantIsolation(_enableTenantIsolation) .setDataDir(new File(_tempDir, DEFAULT_CONTROLLER_DIR + i).getAbsolutePath()) .setConfigOverrides(_configOverrides); @@ -127,7 +136,9 @@ private void startBrokers() throws Exception { for (int i = 0; i < _numBrokers; i++) { StartBrokerCommand brokerStarter = new StartBrokerCommand(); - brokerStarter.setPort(DEFAULT_BROKER_PORT + i).setZkAddress(ZK_ADDRESS).setClusterName(CLUSTER_NAME) + brokerStarter.setPort(DEFAULT_BROKER_PORT + i) + .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS) + .setClusterName(CLUSTER_NAME) .setConfigOverrides(_configOverrides); brokerStarter.execute(); _brokerPorts.add(DEFAULT_BROKER_PORT + i); @@ -139,7 +150,8 @@ private void startServers() for (int i = 0; i < _numServers; i++) { StartServerCommand serverStarter = new StartServerCommand(); serverStarter.setPort(DEFAULT_SERVER_NETTY_PORT + i).setAdminPort(DEFAULT_SERVER_ADMIN_API_PORT + i) - .setZkAddress(ZK_ADDRESS).setClusterName(CLUSTER_NAME) + .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS) + .setClusterName(CLUSTER_NAME) .setDataDir(new File(_tempDir, DEFAULT_SERVER_DATA_DIR + i).getAbsolutePath()) .setSegmentDir(new File(_tempDir, DEFAULT_SERVER_SEGMENT_DIR + i).getAbsolutePath()) .setConfigOverrides(_configOverrides); @@ -151,7 +163,9 @@ private void startMinions() throws Exception { for (int i = 0; i < _numMinions; i++) { StartMinionCommand minionStarter = new StartMinionCommand(); - minionStarter.setMinionPort(DEFAULT_MINION_PORT + i).setZkAddress(ZK_ADDRESS).setClusterName(CLUSTER_NAME) + minionStarter.setMinionPort(DEFAULT_MINION_PORT + i) + .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS) + .setClusterName(CLUSTER_NAME) .setConfigOverrides(_configOverrides); minionStarter.execute(); } @@ -165,7 +179,9 @@ private void clean() public void startAll() throws Exception { registerDefaultPinotFS(); - startZookeeper(); + if (_zkExternalAddress == null) { + startZookeeper(); + } startControllers(); startBrokers(); startServers(); @@ -180,10 +196,13 @@ public void stop() // TODO: Stop Minion StopProcessCommand stopper = new StopProcessCommand(false); - stopper.stopController().stopBroker().stopServer().stopZookeeper(); + if (_zkExternalAddress == null) { + stopper.stopController().stopBroker().stopServer().stopZookeeper(); + } stopper.execute(); - clean(); - + if (_deleteExistingData) { + clean(); + } _isStopped = true; } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java index 2e6d095eb972..b90386bb8f33 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java @@ -34,6 +34,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.spi.stream.StreamDataProvider; +import org.apache.pinot.tools.QuickStartBase; import org.apache.pinot.tools.Quickstart; import org.apache.pinot.tools.utils.KafkaStarterUtils; import org.slf4j.Logger; @@ -70,7 +71,7 @@ public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File avroF _producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); _service = Executors.newFixedThreadPool(1); - Quickstart.printStatus(Quickstart.Color.YELLOW, + QuickStartBase.printStatus(Quickstart.Color.YELLOW, "***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time " + "every 60 events (which is approximately 60 seconds) *****"); } diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/admin/command/TestQuickStartCommand.java b/pinot-tools/src/test/java/org/apache/pinot/tools/admin/command/TestQuickStartCommand.java index 96257a6c063c..62f0c6accb6f 100644 --- a/pinot-tools/src/test/java/org/apache/pinot/tools/admin/command/TestQuickStartCommand.java +++ b/pinot-tools/src/test/java/org/apache/pinot/tools/admin/command/TestQuickStartCommand.java @@ -20,6 +20,7 @@ import java.lang.reflect.InvocationTargetException; import org.apache.pinot.tools.BatchQuickstartWithMinion; +import org.apache.pinot.tools.EmptyQuickstart; import org.apache.pinot.tools.HybridQuickstart; import org.apache.pinot.tools.JoinQuickStart; import org.apache.pinot.tools.JsonIndexQuickStart; @@ -59,6 +60,9 @@ public void testMatchStringToCommand() Assert.assertEquals(quickStartClassFor("offline"), Quickstart.class); Assert.assertEquals(quickStartClassFor("BATCH"), Quickstart.class); + Assert.assertEquals(quickStartClassFor("EMPTY"), EmptyQuickstart.class); + Assert.assertEquals(quickStartClassFor("DEFAULT"), EmptyQuickstart.class); + Assert.assertEquals(quickStartClassFor("OFFLINE_MINION"), BatchQuickstartWithMinion.class); Assert.assertEquals(quickStartClassFor("BATCH_MINION"), BatchQuickstartWithMinion.class); Assert.assertEquals(quickStartClassFor("OFFLINE-MINION"), BatchQuickstartWithMinion.class);