Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a config file to override quickstart configs #8059

Merged
merged 1 commit into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;

Expand All @@ -37,10 +36,6 @@ public String getAuthToken() {
return null;
}

public Map<String, Object> getConfigOverrides() {
return null;
}

public void execute()
throws Exception {
File quickstartTmpDir = new File(_dataDir.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.stream.StreamDataProvider;
Expand All @@ -33,7 +35,6 @@
import org.slf4j.LoggerFactory;

import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
import static org.apache.pinot.tools.Quickstart.printStatus;


/**
Expand All @@ -48,7 +49,7 @@
* ingestion_job_spec.json
* </code>
*/
public class GenericQuickstart {
public class GenericQuickstart extends QuickStartBase {
private static final Logger LOGGER = LoggerFactory.getLogger(GenericQuickstart.class);
private final File _schemaFile;
private final File _tableConfigFile;
Expand All @@ -57,6 +58,11 @@ public class GenericQuickstart {
private StreamDataServerStartable _kafkaStarter;
private ZkStarter.ZookeeperInstance _zookeeperInstance;

public GenericQuickstart() {
this(GenericQuickstart.class.getClassLoader().getResource("examples/batch/starbucksStores").getPath(),
"starbucksStores");
}

public GenericQuickstart(String tableDirectoryPath, String tableName) {
_tableDirectory = new File(tableDirectoryPath);
_tableName = tableName;
Expand All @@ -80,13 +86,19 @@ private void startKafka() {
_kafkaStarter.createTopic("pullRequestMergedEvents", KafkaStarterUtils.getTopicCreationProps(2));
}

@Override
public List<String> types() {
return Arrays.asList("GENERIC");
}

public void execute()
throws Exception {

File tempDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
Preconditions.checkState(tempDir.mkdirs());
QuickstartTableRequest request = new QuickstartTableRequest(_tableDirectory.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir);
final QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
startKafka();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.stream.StreamDataProvider;
Expand All @@ -34,7 +36,6 @@
import org.slf4j.LoggerFactory;

import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
import static org.apache.pinot.tools.Quickstart.printStatus;


/**
Expand All @@ -43,10 +44,14 @@
* Creates a realtime table pullRequestMergedEvents
* Starts the {@link PullRequestMergedEventsStream} to publish pullRequestMergedEvents into the topic
*/
public class GitHubEventsQuickstart {
public class GitHubEventsQuickstart extends QuickStartBase {
private static final Logger LOGGER = LoggerFactory.getLogger(GitHubEventsQuickstart.class);
private StreamDataServerStartable _kafkaStarter;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
private String _personalAccessToken;

public GitHubEventsQuickstart() {
}

private void startKafka() {
_zookeeperInstance = ZkStarter.startLocalZkServer();
Expand All @@ -60,7 +65,7 @@ private void startKafka() {
_kafkaStarter.createTopic("pullRequestMergedEvents", KafkaStarterUtils.getTopicCreationProps(2));
}

public void execute(String personalAccessToken)
private void execute(String personalAccessToken)
throws Exception {
final File quickStartDataDir =
new File(new File("githubEvents-" + System.currentTimeMillis()), "pullRequestMergedEvents");
Expand All @@ -84,7 +89,8 @@ public void execute(String personalAccessToken)
File tempDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
Preconditions.checkState(tempDir.mkdirs());
QuickstartTableRequest request = new QuickstartTableRequest(quickStartDataDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir);
final QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
startKafka();
Expand Down Expand Up @@ -149,4 +155,20 @@ public void execute(String personalAccessToken)

printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
}

@Override
public List<String> types() {
return Arrays.asList("GITHUB-EVENTS", "GITHUB_EVENTS");
}

@Override
public void execute()
throws Exception {
execute(_personalAccessToken);
}

public GitHubEventsQuickstart setPersonalAccessToken(String personalAccessToken) {
_personalAccessToken = personalAccessToken;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void execute()
Preconditions.checkState(dataDir.mkdirs());
QuickstartTableRequest bootstrapTableRequest = prepareTableRequest(baseDir);
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(bootstrapTableRequest),
1, 1, 1, 0, dataDir);
1, 1, 1, 0, dataDir, getConfigOverrides());
printStatus(Color.YELLOW, "***** Starting Kafka *****");
startKafka();
printStatus(Color.YELLOW, "***** Starting airline data stream and publishing to Kafka *****");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void execute()

File tempDir = new File(quickstartTmpDir, "tmp");
FileUtils.forceMkdir(tempDir);
QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request, dimTableRequest), 1, 1, 3, 0, tempDir);
QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request, dimTableRequest), 1, 1, 3, 0, tempDir, getConfigOverrides());

printStatus(Quickstart.Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****");
runner.startAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public void execute()
FileUtils.copyURLToFile(resource, ingestionJobSpecFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, 0, dataDir);
final QuickstartRunner runner =
new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****");
runner.startAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public void execute()
FileUtils.copyURLToFile(resource, ingestionJobSpecFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, 0, dataDir);
final QuickstartRunner runner =
new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****");
runner.startAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.plugin.PluginManager;
Expand All @@ -36,7 +38,7 @@
import static org.apache.pinot.tools.Quickstart.printStatus;


public class PartialUpsertQuickStart {
public class PartialUpsertQuickStart extends QuickStartBase {
private StreamDataServerStartable _kafkaStarter;

public static void main(String[] args)
Expand All @@ -45,6 +47,11 @@ public static void main(String[] args)
new PartialUpsertQuickStart().execute();
}

@Override
public List<String> types() {
return Arrays.asList("PARTIAL-UPSERT", "PARTIAL_UPSERT");
}

// Todo: add a quick start demo
public void execute()
throws Exception {
Expand All @@ -66,7 +73,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
final QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@
*/
package org.apache.pinot.tools;

import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.util.List;
import java.util.Map;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.utils.PinotConfigUtils;


public abstract class QuickStartBase {
protected File _dataDir = FileUtils.getTempDirectory();
protected String _zkExternalAddress;
protected String _configFilePath;

public QuickStartBase setDataDir(String dataDir) {
_dataDir = new File(dataDir);
Expand All @@ -38,6 +44,11 @@ public QuickStartBase setZkExternalAddress(String zkExternalAddress) {
return this;
}

public QuickStartBase setConfigFilePath(String configFilePath) {
_configFilePath = configFilePath;
return this;
}

public abstract List<String> types();

protected void waitForBootstrapToComplete(QuickstartRunner runner)
Expand All @@ -53,4 +64,13 @@ public static void printStatus(Quickstart.Color color, String message) {

public abstract void execute()
throws Exception;

protected Map<String, Object> getConfigOverrides() {
try {
return StringUtils.isEmpty(_configFilePath) ? ImmutableMap.of()
: PinotConfigUtils.readConfigFromFile(_configFilePath);
} catch (ConfigurationException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
Expand Down Expand Up @@ -67,10 +66,6 @@ public String getAuthToken() {
return null;
}

public Map<String, Object> getConfigOverrides() {
return null;
}

public static String prettyPrintResponse(JsonNode response) {
StringBuilder responseBuilder = new StringBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
final QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -64,8 +63,8 @@ public static void main(String[] args)
}

public Map<String, Object> getConfigOverrides() {
Map<String, Object> properties = new HashMap<>();
properties.put("controller.task.scheduler.enabled", true);
Map<String, Object> properties = super.getConfigOverrides();
properties.putIfAbsent("controller.task.scheduler.enabled", true);
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
final QuickstartRunner runner
= new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public String description() {
public boolean execute()
throws Exception {
PluginManager.get().init();
new GitHubEventsQuickstart().execute(_personalAccessToken);
new GitHubEventsQuickstart().setPersonalAccessToken(_personalAccessToken).execute();
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public class QuickStartCommand extends AbstractBaseAdminCommand implements Comma
description = "URL for an external Zookeeper instance instead of using the default embedded instance")
private String _zkExternalAddress;

@CommandLine.Option(names = {"-configFile", "-configFilePath"}, required = false,
description = "Config file path to override default pinot configs")
private String _configFilePath;

@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false,
description = "Print this message.")
private boolean _help = false;
Expand Down Expand Up @@ -130,6 +134,10 @@ public boolean execute() throws Exception {
quickstart.setZkExternalAddress(_zkExternalAddress);
}

if (_configFilePath != null) {
quickstart.setConfigFilePath(_configFilePath);
}

quickstart.execute();
return true;
}
Expand Down
Loading