From 4acfecb627e22017f3783cef4ad799888bea135d Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Mon, 6 Dec 2021 11:15:48 -0500 Subject: [PATCH 01/11] Support multiple plugin directories --- .../common/utils/TarGzCompressionUtils.java | 17 +++- .../HadoopSegmentGenerationJobRunner.java | 52 +++++++----- .../SparkSegmentGenerationJobRunner.java | 42 ++++++---- .../pinot/spi/plugin/PluginManager.java | 83 +++++++++++-------- 4 files changed, 118 insertions(+), 76 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java index b2090448c837..9e5cc038a03f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java @@ -27,6 +27,7 @@ import java.io.OutputStream; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.ArchiveInputStream; @@ -56,15 +57,23 @@ private TarGzCompressionUtils() { */ public static void createTarGzFile(File inputFile, File outputFile) throws IOException { + createTarGzFile((File) Collections.singletonList(inputFile), outputFile); + } + + public static void createTarGzFile(File[] inputFiles, File outputFile) + throws IOException { Preconditions.checkArgument(outputFile.getName().endsWith(TAR_GZ_FILE_EXTENSION), "Output file: %s does not have '.tar.gz' file extension", outputFile); try (OutputStream fileOut = Files.newOutputStream(outputFile.toPath()); - BufferedOutputStream bufferedOut = new BufferedOutputStream(fileOut); - OutputStream gzipOut = new GzipCompressorOutputStream(bufferedOut); - TarArchiveOutputStream tarGzOut = new TarArchiveOutputStream(gzipOut)) { + BufferedOutputStream bufferedOut = new BufferedOutputStream(fileOut); + OutputStream gzipOut = new GzipCompressorOutputStream(bufferedOut); + TarArchiveOutputStream tarGzOut = new TarArchiveOutputStream(gzipOut)) { tarGzOut.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_STAR); tarGzOut.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU); - addFileToTarGz(tarGzOut, inputFile, ""); + + for (File inputFile : inputFiles) { + addFileToTarGz(tarGzOut, inputFile, ""); + } } } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java index be585bcc561c..bb472bb7d43b 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -63,6 +63,7 @@ import org.yaml.snakeyaml.Yaml; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ; import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME; @@ -390,27 +391,40 @@ protected void addMapperJarToDistributedCache(Job job, PinotFS outputDirFS, URI } protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, URI stagingDirURI) { - File pluginsRootDir = new File(PluginManager.get().getPluginsRootDir()); - if (pluginsRootDir.exists()) { - try { - File pluginsTarGzFile = File.createTempFile("pinot-plugins-", ".tar.gz"); - TarGzCompressionUtils.createTarGzFile(pluginsRootDir, pluginsTarGzFile); - - // Copy to staging directory - Path cachedPluginsTarball = new Path(stagingDirURI.toString(), SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ); - outputDirFS.copyFromLocalFile(pluginsTarGzFile, cachedPluginsTarball.toUri()); - job.addCacheFile(cachedPluginsTarball.toUri()); - } catch (Exception e) { - LOGGER.error("Failed to tar plugins directory and upload to staging dir", e); - throw new RuntimeException(e); - } + String[] pluginDirectories = PluginManager.get().getPluginsDirectories(); + if (pluginDirectories.length == 0) { + LOGGER.warn("Plugin directories is null or empty, nothing to pack to distributed cache"); + return; + } - String pluginsIncludes = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME); - if (pluginsIncludes != null) { - job.getConfiguration().set(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsIncludes); + ArrayList validPluginDirectories = new ArrayList(); + + for (String pluginsDirPath : pluginDirectories) { + File pluginsDir = new File(pluginsDirPath); + if (pluginsDir.exists()) { + validPluginDirectories.add(pluginsDir); + } else { + LOGGER.warn("Cannot find Pinot plugins directory at [{}]", pluginsDirPath); + return; } - } else { - LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", pluginsRootDir); + } + + File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ); + try { + TarGzCompressionUtils.createTarGzFile((File[]) validPluginDirectories.toArray(), pluginsTarGzFile); + + // Copy to staging directory + Path cachedPluginsTarball = new Path(stagingDirURI.toString(), SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ); + outputDirFS.copyFromLocalFile(pluginsTarGzFile, cachedPluginsTarball.toUri()); + job.addCacheFile(cachedPluginsTarball.toUri()); + } catch (Exception e) { + LOGGER.error("Failed to tar plugins directories and upload to staging dir", e); + throw new RuntimeException(e); + } + + String pluginsIncludes = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME); + if (pluginsIncludes != null) { + job.getConfiguration().set(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsIncludes); } } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index 5b337d6509b4..e6dbd2444ffa 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -374,26 +374,34 @@ protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext, Strin } protected void packPluginsToDistributedCache(JavaSparkContext sparkContext) { - String pluginsRootDirPath = PluginManager.get().getPluginsRootDir(); - if (pluginsRootDirPath == null) { - LOGGER.warn("Local Pinot plugins directory is null, skip packaging..."); + String[] pluginDirectories = PluginManager.get().getPluginsDirectories(); + if (pluginDirectories.length == 0) { + LOGGER.warn("Plugin directories is null or empty, skip packaging..."); return; } - File pluginsRootDir = new File(pluginsRootDirPath); - if (pluginsRootDir.exists()) { - File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ); - try { - TarGzCompressionUtils.createTarGzFile(pluginsRootDir, pluginsTarGzFile); - } catch (IOException e) { - LOGGER.error("Failed to tar plugins directory", e); - } - sparkContext.addFile(pluginsTarGzFile.getAbsolutePath()); - String pluginsIncludes = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME); - if (pluginsIncludes != null) { - sparkContext.getConf().set(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsIncludes); + + ArrayList validPluginDirectories = new ArrayList(); + + for (String pluginsDirPath : pluginDirectories) { + File pluginsDir = new File(pluginsDirPath); + if (pluginsDir.exists()) { + validPluginDirectories.add(pluginsDir); + } else { + LOGGER.warn("Cannot find Pinot plugins directory at [{}]", pluginsDirPath); + return; } - } else { - LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", pluginsRootDirPath); + } + + File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ); + try { + TarGzCompressionUtils.createTarGzFile((File[]) validPluginDirectories.toArray(), pluginsTarGzFile); + } catch (IOException e) { + LOGGER.error("Failed to tar plugins directories", e); + } + sparkContext.addFile(pluginsTarGzFile.getAbsolutePath()); + String pluginsIncludes = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME); + if (pluginsIncludes != null) { + sparkContext.getConf().set(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsIncludes); } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java index d9bfc5a8c626..0cb0ab61fa7b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java @@ -110,7 +110,7 @@ public class PluginManager { }; private Map _registry = new HashMap<>(); - private String _pluginsRootDir; + private String _pluginsDirectories; private String _pluginsInclude; private boolean _initialized = false; @@ -124,10 +124,10 @@ public synchronized void init() { return; } try { - _pluginsRootDir = System.getProperty(PLUGINS_DIR_PROPERTY_NAME); + _pluginsDirectories = System.getProperty(PLUGINS_DIR_PROPERTY_NAME); } catch (Exception e) { LOGGER.error("Failed to load env variable {}", PLUGINS_DIR_PROPERTY_NAME, e); - _pluginsRootDir = null; + _pluginsDirectories = null; } try { _pluginsInclude = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME); @@ -135,44 +135,55 @@ public synchronized void init() { LOGGER.error("Failed to load env variable {}", PLUGINS_INCLUDE_PROPERTY_NAME, e); _pluginsInclude = null; } - init(_pluginsRootDir, _pluginsInclude); + init(_pluginsDirectories, _pluginsInclude); _initialized = true; } - private void init(String pluginsRootDir, String pluginsInclude) { - if (StringUtils.isEmpty(pluginsRootDir)) { + private void init(String pluginsDirectories, String pluginsInclude) { + if (StringUtils.isEmpty(pluginsDirectories)) { LOGGER.info("Env variable '{}' is not specified. Set this env variable to load additional plugins.", PLUGINS_DIR_PROPERTY_NAME); return; } else { - if (!new File(pluginsRootDir).exists()) { - LOGGER.warn("Plugins root dir [{}] doesn't exist.", pluginsRootDir); - return; - } - LOGGER.info("Plugins root dir is [{}]", pluginsRootDir); - } - Collection jarFiles = FileUtils.listFiles(new File(pluginsRootDir), new String[]{JAR_FILE_EXTENSION}, true); - List pluginsToLoad = null; - if (!StringUtils.isEmpty(pluginsInclude)) { - pluginsToLoad = Arrays.asList(pluginsInclude.split(",")); - LOGGER.info("Trying to load plugins: [{}]", Arrays.toString(pluginsToLoad.toArray())); - } else { - LOGGER.info("Please use env variable '{}' to customize plugins to load. Loading all plugins: {}", - PLUGINS_INCLUDE_PROPERTY_NAME, Arrays.toString(jarFiles.toArray())); - } - for (File jarFile : jarFiles) { - File pluginDir = jarFile.getParentFile(); - String pluginName = pluginDir.getName(); - if (pluginsToLoad != null) { - if (!pluginsToLoad.contains(pluginName)) { - continue; + String[] directories = pluginsDirectories.split(";"); + LOGGER.info("Plugin directories env: {}, parsed directories to load: '{}'", pluginsDirectories, directories); + + for (String pluginsDirectory : directories) { + if (!new File(pluginsDirectory).exists()) { + LOGGER.warn("Plugins dir [{}] doesn't exist.", pluginsDirectory); + return; + } + + Collection jarFiles = FileUtils.listFiles( + new File(pluginsDirectory), + new String[]{JAR_FILE_EXTENSION}, + true + ); + List pluginsToLoad = null; + if (!StringUtils.isEmpty(pluginsInclude)) { + pluginsToLoad = Arrays.asList(pluginsInclude.split(";")); + LOGGER.info("Trying to load plugins: [{}]", Arrays.toString(pluginsToLoad.toArray())); + } else { + LOGGER.info("Please use env variable '{}' to customize plugins to load. Loading all plugins: {}", + PLUGINS_INCLUDE_PROPERTY_NAME, Arrays.toString(jarFiles.toArray())); + } + for (File jarFile : jarFiles) { + File pluginDir = jarFile.getParentFile(); + String pluginName = pluginDir.getName(); + LOGGER.info("Found plugin, pluginDir: {}, pluginName: {}", pluginDir, pluginName); + if (pluginsToLoad != null) { + if (!pluginsToLoad.contains(pluginName)) { + LOGGER.info("Skipping plugin: {} is not inside {}", pluginName, pluginsToLoad); + continue; + } + } + try { + load(pluginName, pluginDir); + LOGGER.info("Successfully Loaded plugin [{}] from dir [{}]", pluginName, pluginDir); + } catch (Exception e) { + LOGGER.error("Failed to load plugin [{}] from dir [{}]", pluginName, pluginDir, e); + } } - } - try { - load(pluginName, pluginDir); - LOGGER.info("Successfully Loaded plugin [{}] from dir [{}]", pluginName, pluginDir); - } catch (Exception e) { - LOGGER.error("Failed to load plugin [{}] from dir [{}]", pluginName, pluginDir, e); } } initRecordReaderClassMap(); @@ -312,8 +323,8 @@ public T createInstance(String pluginName, String className, Class[] argType return (T) instance; } - public String getPluginsRootDir() { - return _pluginsRootDir; + public String[] getPluginsDirectories() { + return _pluginsDirectories.split(";"); } public static PluginManager get() { @@ -338,4 +349,4 @@ public void registerRecordReaderClass(String inputFormat, String recordReaderCla INPUT_FORMAT_TO_RECORD_READER_CONFIG_CLASS_NAME_MAP.put(inputFormat.toLowerCase(), recordReaderConfigClass); } } -} +} \ No newline at end of file From 1db47ecd639f56f985fb8d96c81ce940aa7f003b Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Mon, 6 Dec 2021 15:05:26 -0500 Subject: [PATCH 02/11] Clarify comment --- .../main/java/org/apache/pinot/spi/plugin/PluginManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java index 0cb0ab61fa7b..435b85346ed4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java @@ -173,7 +173,7 @@ private void init(String pluginsDirectories, String pluginsInclude) { LOGGER.info("Found plugin, pluginDir: {}, pluginName: {}", pluginDir, pluginName); if (pluginsToLoad != null) { if (!pluginsToLoad.contains(pluginName)) { - LOGGER.info("Skipping plugin: {} is not inside {}", pluginName, pluginsToLoad); + LOGGER.info("Skipping plugin: {} is not inside pluginsToLoad {}", pluginName, pluginsToLoad); continue; } } From 2722e27bdfd08c6fe548ebb7c8eb706fc9223460 Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Mon, 6 Dec 2021 15:09:03 -0500 Subject: [PATCH 03/11] update appAssemblerScriptTemplate to add multiple plugin dirs --- .../main/resources/appAssemblerScriptTemplate | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/pinot-tools/src/main/resources/appAssemblerScriptTemplate b/pinot-tools/src/main/resources/appAssemblerScriptTemplate index e3c7459933b7..65bae6ebef8e 100644 --- a/pinot-tools/src/main/resources/appAssemblerScriptTemplate +++ b/pinot-tools/src/main/resources/appAssemblerScriptTemplate @@ -109,35 +109,38 @@ fi # Set $PLUGINS_CLASSPATH for plugin jars to be put into classpath. # $PLUGINS_DIR and $PLUGINS_INCLUDE are used if $PLUGINS_CLASSPATH is not set. -# $PLUGINS_DIR is the root directory of plugins directory, default to '"$BASEDIR"/plugins' if not set. +# $PLUGINS_DIR is semi-colon separated list of plugin directories, default to '"$BASEDIR"/plugins' if not set. # $PLUGINS_INCLUDE is semi-colon separated plugins name, e.g. pinot-avro;pinot-batch-ingestion-standalone. Default is not set, which means load all the plugin jars. if [ -z "$PLUGINS_CLASSPATH" ] ; then if [ -z "$PLUGINS_DIR" ] ; then PLUGINS_DIR="$BASEDIR"/plugins fi - if [ -d "$PLUGINS_DIR" ] ; then - if [ -n "$PLUGINS_INCLUDE" ] ; then - export IFS=";" - for PLUGIN_JAR in $PLUGINS_INCLUDE; do - PLUGIN_JAR_PATH=$(find "$PLUGINS_DIR" -path \*/"$PLUGIN_JAR"/"$PLUGIN_JAR"-\*.jar) + + export IFS=';' + for DIR in $PLUGINS_DIR; do + if [ -d "$DIR" ] ; then + if [ -n "$PLUGINS_INCLUDE" ] ; then + for PLUGIN_JAR in $PLUGINS_INCLUDE; do + PLUGIN_JAR_PATH=$(find "$DIR" -path \*/"$PLUGIN_JAR"/"$PLUGIN_JAR"-\*.jar) + if [ -n "$PLUGINS_CLASSPATH" ] ; then + PLUGINS_CLASSPATH=$PLUGINS_CLASSPATH:$PLUGIN_JAR_PATH + else + PLUGINS_CLASSPATH=$PLUGIN_JAR_PATH + fi + done + else + PLUGIN_JARS=$(find "$DIR" -name \*.jar) + for PLUGIN_JAR in $PLUGIN_JARS ; do if [ -n "$PLUGINS_CLASSPATH" ] ; then - PLUGINS_CLASSPATH=$PLUGINS_CLASSPATH:$PLUGIN_JAR_PATH + PLUGINS_CLASSPATH=$PLUGINS_CLASSPATH:$PLUGIN_JAR else - PLUGINS_CLASSPATH=$PLUGIN_JAR_PATH + PLUGINS_CLASSPATH=$PLUGIN_JAR fi - done - unset IFS - else - PLUGIN_JARS=$(find "$PLUGINS_DIR" -name \*.jar) - for PLUGIN_JAR in $PLUGIN_JARS ; do - if [ -n "$PLUGINS_CLASSPATH" ] ; then - PLUGINS_CLASSPATH=$PLUGINS_CLASSPATH:$PLUGIN_JAR - else - PLUGINS_CLASSPATH=$PLUGIN_JAR - fi - done + done + fi fi - fi + done + unset IFS fi if [ -n "$PLUGINS_CLASSPATH" ] ; then From f32149c151ebb05e9e6485fba5ae95fed5b1fee4 Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Mon, 6 Dec 2021 15:50:21 -0500 Subject: [PATCH 04/11] Fix list/arr conversion --- .../org/apache/pinot/common/utils/TarGzCompressionUtils.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java index 9e5cc038a03f..5535e4bbeeb5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java @@ -27,7 +27,6 @@ import java.io.OutputStream; import java.nio.file.Files; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.ArchiveInputStream; @@ -57,7 +56,7 @@ private TarGzCompressionUtils() { */ public static void createTarGzFile(File inputFile, File outputFile) throws IOException { - createTarGzFile((File) Collections.singletonList(inputFile), outputFile); + createTarGzFile(new File[] {inputFile}, outputFile); } public static void createTarGzFile(File[] inputFiles, File outputFile) From 166d3304b80aa8025a15980497c09eb9c78d1b51 Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Mon, 6 Dec 2021 17:13:50 -0500 Subject: [PATCH 05/11] Add testDirectories test in TarGzCompressionUtilsTest --- .../utils/TarGzCompressionUtilsTest.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java index 829a1ba2a210..b23a6ba8c029 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java @@ -81,6 +81,48 @@ public void testFile() assertEquals(FileUtils.readFileToString(untarredFile), fileContent); } + @Test + public void testDirectories() + throws IOException { + String dirToTarName1 = "dir1"; + String dirToTarName2 = "dir2"; + File dir1 = new File(DATA_DIR, dirToTarName1); + File dir2 = new File(DATA_DIR, dirToTarName2); + + File[] dirsToTar = new File[] {dir1, dir2}; + + String fileName1 = "data1"; + String fileContent1 = "fileContent1"; + String fileName2 = "data2"; + String fileContent2 = "fileContent2"; + FileUtils.write(new File(dir1, fileName1), fileContent1); + FileUtils.write(new File(dir2, fileName2), fileContent2); + + String outputTarName = "output_tar" + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION; + File tarGzFile = new File(TAR_DIR, outputTarName); + TarGzCompressionUtils.createTarGzFile(dirsToTar, tarGzFile); + + List untarredFiles = TarGzCompressionUtils.untar(tarGzFile, UNTAR_DIR); + assertEquals(untarredFiles.size(), 4); + + File untarredFileDir1 = untarredFiles.get(0); + File untarredFileDir2 = untarredFiles.get(1); + + assertEquals(untarredFileDir1, new File(UNTAR_DIR, dirToTarName1)); + assertEquals(untarredFileDir2, new File(UNTAR_DIR, dirToTarName2)); + + File[] filesDir1 = untarredFileDir1.listFiles(); + assertNotNull(filesDir1); + assertEquals(filesDir1.length, 1); + assertEquals(FileUtils.readFileToString(new File(untarredFileDir1, fileName1)), fileContent1); + + + File[] filesDir2 = untarredFileDir2.listFiles(); + assertNotNull(filesDir2); + assertEquals(filesDir2.length, 1); + assertEquals(FileUtils.readFileToString(new File(untarredFileDir2, fileName2)), fileContent2); + } + @Test public void testDirectory() throws IOException { From 197bd5eb3c6a28dd78078963618116fcc0a27266 Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Tue, 7 Dec 2021 02:00:33 -0500 Subject: [PATCH 06/11] Fix bash script IFS unset --- .../apache/pinot/common/utils/TarGzCompressionUtilsTest.java | 2 +- pinot-tools/src/main/resources/appAssemblerScriptTemplate | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java index b23a6ba8c029..8b63f0d1bd72 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java @@ -106,7 +106,7 @@ public void testDirectories() assertEquals(untarredFiles.size(), 4); File untarredFileDir1 = untarredFiles.get(0); - File untarredFileDir2 = untarredFiles.get(1); + File untarredFileDir2 = untarredFiles.get(2); assertEquals(untarredFileDir1, new File(UNTAR_DIR, dirToTarName1)); assertEquals(untarredFileDir2, new File(UNTAR_DIR, dirToTarName2)); diff --git a/pinot-tools/src/main/resources/appAssemblerScriptTemplate b/pinot-tools/src/main/resources/appAssemblerScriptTemplate index 65bae6ebef8e..359ab9943460 100644 --- a/pinot-tools/src/main/resources/appAssemblerScriptTemplate +++ b/pinot-tools/src/main/resources/appAssemblerScriptTemplate @@ -119,7 +119,9 @@ if [ -z "$PLUGINS_CLASSPATH" ] ; then export IFS=';' for DIR in $PLUGINS_DIR; do if [ -d "$DIR" ] ; then + unset IFS if [ -n "$PLUGINS_INCLUDE" ] ; then + export IFS=';' for PLUGIN_JAR in $PLUGINS_INCLUDE; do PLUGIN_JAR_PATH=$(find "$DIR" -path \*/"$PLUGIN_JAR"/"$PLUGIN_JAR"-\*.jar) if [ -n "$PLUGINS_CLASSPATH" ] ; then @@ -128,6 +130,7 @@ if [ -z "$PLUGINS_CLASSPATH" ] ; then PLUGINS_CLASSPATH=$PLUGIN_JAR_PATH fi done + unset IFS else PLUGIN_JARS=$(find "$DIR" -name \*.jar) for PLUGIN_JAR in $PLUGIN_JARS ; do From cef7a1faa3d1d6ecb699f8226a1d4b8bf0475f05 Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Tue, 7 Dec 2021 02:53:52 -0500 Subject: [PATCH 07/11] Don't check for both null and empty --- .../batch/hadoop/HadoopSegmentGenerationJobRunner.java | 4 ++-- .../batch/spark/SparkSegmentGenerationJobRunner.java | 4 ++-- .../main/java/org/apache/pinot/spi/plugin/PluginManager.java | 5 ++++- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java index bb472bb7d43b..0c735e9093db 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -392,8 +392,8 @@ protected void addMapperJarToDistributedCache(Job job, PinotFS outputDirFS, URI protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, URI stagingDirURI) { String[] pluginDirectories = PluginManager.get().getPluginsDirectories(); - if (pluginDirectories.length == 0) { - LOGGER.warn("Plugin directories is null or empty, nothing to pack to distributed cache"); + if (pluginDirectories == null) { + LOGGER.warn("Plugin directories is null, nothing to pack to distributed cache"); return; } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index e6dbd2444ffa..a689b77dd060 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -375,8 +375,8 @@ protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext, Strin protected void packPluginsToDistributedCache(JavaSparkContext sparkContext) { String[] pluginDirectories = PluginManager.get().getPluginsDirectories(); - if (pluginDirectories.length == 0) { - LOGGER.warn("Plugin directories is null or empty, skip packaging..."); + if (pluginDirectories == null) { + LOGGER.warn("Plugin directories is null, skipping packaging..."); return; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java index 435b85346ed4..e34767fb1732 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java @@ -324,7 +324,10 @@ public T createInstance(String pluginName, String className, Class[] argType } public String[] getPluginsDirectories() { - return _pluginsDirectories.split(";"); + if (_pluginsDirectories != null) { + return _pluginsDirectories.split(";"); + } + return null; } public static PluginManager get() { From 541911a4db11713446d6e862afa9d32515547453 Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Tue, 7 Dec 2021 03:31:01 -0500 Subject: [PATCH 08/11] Fix conversion error --- .../batch/hadoop/HadoopSegmentGenerationJobRunner.java | 3 ++- .../ingestion/batch/spark/SparkSegmentGenerationJobRunner.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java index 0c735e9093db..4cbfdf577ea0 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -411,7 +411,8 @@ protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, URI s File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ); try { - TarGzCompressionUtils.createTarGzFile((File[]) validPluginDirectories.toArray(), pluginsTarGzFile); + File[] files = validPluginDirectories.toArray(new File[0]); + TarGzCompressionUtils.createTarGzFile(files, pluginsTarGzFile); // Copy to staging directory Path cachedPluginsTarball = new Path(stagingDirURI.toString(), SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index a689b77dd060..70848150e1cf 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -394,7 +394,8 @@ protected void packPluginsToDistributedCache(JavaSparkContext sparkContext) { File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ); try { - TarGzCompressionUtils.createTarGzFile((File[]) validPluginDirectories.toArray(), pluginsTarGzFile); + File[] files = validPluginDirectories.toArray(new File[0]); + TarGzCompressionUtils.createTarGzFile(files, pluginsTarGzFile); } catch (IOException e) { LOGGER.error("Failed to tar plugins directories", e); } From 6c4365bdeea4d0c3a4f9909c90d60e885333d76d Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Wed, 8 Dec 2021 18:29:20 -0500 Subject: [PATCH 09/11] Addr review comments + test for multiple plugin loading --- .../common/utils/TarGzCompressionUtils.java | 4 + .../utils/TarGzCompressionUtilsTest.java | 9 ++ .../pinot/spi/plugin/PluginManager.java | 98 +++++++++++++------ .../pinot/spi/plugin/PluginManagerTest.java | 84 ++++++++++++++-- .../main/resources/appAssemblerScriptTemplate | 1 + 5 files changed, 156 insertions(+), 40 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java index 5535e4bbeeb5..cd3e5138f5c2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java @@ -59,6 +59,10 @@ public static void createTarGzFile(File inputFile, File outputFile) createTarGzFile(new File[] {inputFile}, outputFile); } + /** + * Creates a tar.gz file from a list of input file/directories to the output file. The output file must have + * ".tar.gz" as the file extension. + */ public static void createTarGzFile(File[] inputFiles, File outputFile) throws IOException { Preconditions.checkArgument(outputFile.getName().endsWith(TAR_GZ_FILE_EXTENSION), diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java index 8b63f0d1bd72..128f87933bc3 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/TarGzCompressionUtilsTest.java @@ -105,6 +105,15 @@ public void testDirectories() List untarredFiles = TarGzCompressionUtils.untar(tarGzFile, UNTAR_DIR); assertEquals(untarredFiles.size(), 4); + /* + untarredFiles ends up being a list as follows: + /dir1/ + /dir1/data1 + /dir2/ + /dir2/data2 + + To fetch the 2 directories we want for the following assertions, we expect them at indexes 0 and 2. + */ File untarredFileDir1 = untarredFiles.get(0); File untarredFileDir2 = untarredFiles.get(2); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java index e34767fb1732..2c4c69e8d68a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.spi.plugin; +import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.lang.reflect.Constructor; import java.net.MalformedURLException; @@ -145,38 +146,14 @@ private void init(String pluginsDirectories, String pluginsInclude) { PLUGINS_DIR_PROPERTY_NAME); return; } else { - String[] directories = pluginsDirectories.split(";"); - LOGGER.info("Plugin directories env: {}, parsed directories to load: '{}'", pluginsDirectories, directories); + try { + HashMap plugins = getPluginsToLoad(pluginsDirectories, pluginsInclude); + LOGGER.info("#getPluginsToLoad has produced {} plugins to load", plugins.size()); - for (String pluginsDirectory : directories) { - if (!new File(pluginsDirectory).exists()) { - LOGGER.warn("Plugins dir [{}] doesn't exist.", pluginsDirectory); - return; - } + for (Map.Entry entry : plugins.entrySet()) { + String pluginName = entry.getKey(); + File pluginDir = entry.getValue(); - Collection jarFiles = FileUtils.listFiles( - new File(pluginsDirectory), - new String[]{JAR_FILE_EXTENSION}, - true - ); - List pluginsToLoad = null; - if (!StringUtils.isEmpty(pluginsInclude)) { - pluginsToLoad = Arrays.asList(pluginsInclude.split(";")); - LOGGER.info("Trying to load plugins: [{}]", Arrays.toString(pluginsToLoad.toArray())); - } else { - LOGGER.info("Please use env variable '{}' to customize plugins to load. Loading all plugins: {}", - PLUGINS_INCLUDE_PROPERTY_NAME, Arrays.toString(jarFiles.toArray())); - } - for (File jarFile : jarFiles) { - File pluginDir = jarFile.getParentFile(); - String pluginName = pluginDir.getName(); - LOGGER.info("Found plugin, pluginDir: {}, pluginName: {}", pluginDir, pluginName); - if (pluginsToLoad != null) { - if (!pluginsToLoad.contains(pluginName)) { - LOGGER.info("Skipping plugin: {} is not inside pluginsToLoad {}", pluginName, pluginsToLoad); - continue; - } - } try { load(pluginName, pluginDir); LOGGER.info("Successfully Loaded plugin [{}] from dir [{}]", pluginName, pluginDir); @@ -184,9 +161,68 @@ private void init(String pluginsDirectories, String pluginsInclude) { LOGGER.error("Failed to load plugin [{}] from dir [{}]", pluginName, pluginDir, e); } } + + initRecordReaderClassMap(); + } catch (IllegalArgumentException e) { + LOGGER.warn(e.getMessage()); + } + } + } + + /** + * This method will take a semi-colon delimited string of directories and a semi-colon delimited string of plugin + * names. It will traverse the directories in order and produce a map of plugins to be loaded. + * If a plugin is found in multiple directories, only the first copy of it will be picked up. + * @param pluginsDirectories + * @param pluginsInclude + * @return A hash map with key = plugin name, value = file object + */ + @VisibleForTesting + public HashMap getPluginsToLoad(String pluginsDirectories, String pluginsInclude) throws + IllegalArgumentException { + String[] directories = pluginsDirectories.split(";"); + LOGGER.info("Plugin directories env: {}, parsed directories to load: '{}'", pluginsDirectories, directories); + + HashMap finalPluginsToLoad = new HashMap<>(); + + for (String pluginsDirectory : directories) { + if (!new File(pluginsDirectory).exists()) { + throw new IllegalArgumentException(String.format("Plugins dir [{}] doesn't exist.", pluginsDirectory)); + } + + Collection jarFiles = FileUtils.listFiles( + new File(pluginsDirectory), + new String[]{JAR_FILE_EXTENSION}, + true + ); + List pluginsToLoad = null; + if (!StringUtils.isEmpty(pluginsInclude)) { + pluginsToLoad = Arrays.asList(pluginsInclude.split(";")); + LOGGER.info("Potential plugins to load: [{}]", Arrays.toString(pluginsToLoad.toArray())); + } else { + LOGGER.info("Please use env variable '{}' to customize plugins to load. Loading all plugins: {}", + PLUGINS_INCLUDE_PROPERTY_NAME, Arrays.toString(jarFiles.toArray())); + } + + for (File jarFile : jarFiles) { + File pluginDir = jarFile.getParentFile(); + String pluginName = pluginDir.getName(); + LOGGER.info("Found plugin, pluginDir: {}, pluginName: {}", pluginDir, pluginName); + if (pluginsToLoad != null) { + if (!pluginsToLoad.contains(pluginName)) { + LOGGER.info("Skipping plugin: {} is not inside pluginsToLoad {}", pluginName, pluginsToLoad); + continue; + } + } + + if (!finalPluginsToLoad.containsKey(pluginName)) { + finalPluginsToLoad.put(pluginName, pluginDir); + LOGGER.info("Added [{}] from dir [{}] to final list of plugins to load", pluginName, pluginDir); + } } } - initRecordReaderClassMap(); + + return finalPluginsToLoad; } private void initRecordReaderClassMap() { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java index 825e63612ed9..7dfeb09abccb 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java @@ -20,7 +20,11 @@ import java.io.File; import java.io.FileOutputStream; +import java.io.IOException; import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.jar.JarEntry; import java.util.jar.JarOutputStream; import javax.tools.JavaCompiler; @@ -42,22 +46,84 @@ public class PluginManagerTest { private String _jarFile; private File _jarDirFile; - @BeforeClass - public void setup() { + private File _p1; + private File _p1Copy; + private File _p2; + private File _p3; + private File _p4; + @BeforeClass + public void setup() throws IOException { _tempDir = new File(System.getProperty("java.io.tmpdir"), "pinot-plugin-test"); - _tempDir.delete(); + FileUtils.deleteDirectory(_tempDir); _tempDir.mkdirs(); + } - String jarDir = _tempDir + "/test-record-reader"; - _jarFile = jarDir + "/test-record-reader.jar"; - _jarDirFile = new File(jarDir); - _jarDirFile.mkdirs(); + @Test + public void testGetPluginsToLoad() throws IOException { + /* We have two plugin directories (../plugins/d1/ and ../plugins/d2/) + * plugins to include = [ p1, p2, p3 ] + * d1 has plugins: p1 + * d2 has plugins: p1, p2, p3, p4 + * We expect d1/p1, d2/p2, d2/p3 to be picked up + * - ensuring second instance of p1 is ignored + * - ensuring p4 is ignored as it's not on the plugins to include list + */ + + String pluginsDirs = _tempDir + "/plugins/d1;" + _tempDir + "/plugins/d2;"; + String pluginsToInclude = "p1;p2;p3"; // specifically excluding p3.jar + + File pluginsDir = new File(_tempDir + "/plugins"); + pluginsDir.mkdir(); + File subPluginsDir1 = new File(pluginsDir + "/d1"); + subPluginsDir1.mkdir(); + File subPluginsDir2 = new File(pluginsDir + "/d2"); + subPluginsDir2.mkdir(); + + _p1 = new File(pluginsDir + "/d1/p1/p1.jar"); + FileUtils.touch(_p1); + _p1Copy = new File(pluginsDir + "/d2/p1/p1.jar"); + FileUtils.touch(_p1Copy); + _p2 = new File(pluginsDir + "/d2/p2/p2.jar"); + FileUtils.touch(_p2); + _p3 = new File(pluginsDir + "/d2/p3/p3.jar"); + FileUtils.touch(_p3); + _p4 = new File(pluginsDir + "/d2/p4/p4.jar"); + FileUtils.touch(_p4); + + HashMap actualPluginsMap = PluginManager.get().getPluginsToLoad(pluginsDirs, pluginsToInclude); + Assert.assertEquals(actualPluginsMap.size(), 3); + + ArrayList actualPluginNames = new ArrayList<>(); + ArrayList actualPluginPaths = new ArrayList<>(); + + for (Map.Entry entry : actualPluginsMap.entrySet()) { + actualPluginNames.add(entry.getKey()); + actualPluginPaths.add(entry.getValue().getAbsolutePath()); + } + + ArrayList expectedPluginNames = new ArrayList<>(); + expectedPluginNames.add("p1"); + expectedPluginNames.add("p2"); + expectedPluginNames.add("p3"); + ArrayList expectedPluginPaths = new ArrayList<>(); + expectedPluginPaths.add(_p1.getParentFile().getAbsolutePath()); + expectedPluginPaths.add(_p2.getParentFile().getAbsolutePath()); + expectedPluginPaths.add(_p3.getParentFile().getAbsolutePath()); + + Assert.assertEquals(actualPluginNames, expectedPluginNames); + Assert.assertEquals(actualPluginPaths, expectedPluginPaths); } @Test public void testSimple() throws Exception { + + String jarDir = _tempDir + "/test-record-reader"; + _jarFile = jarDir + "/test-record-reader.jar"; + _jarDirFile = new File(jarDir); + _jarDirFile.mkdirs(); + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); URL javaFile = Thread.currentThread().getContextClassLoader().getResource(TEST_RECORD_READER_FILE); if (javaFile != null) { @@ -141,8 +207,8 @@ public void testBackwardCompatible() { } @AfterClass - public void tearDown() { - _tempDir.delete(); + public void tearDown() throws IOException { + FileUtils.deleteDirectory(_tempDir); FileUtils.deleteQuietly(_jarDirFile); } } diff --git a/pinot-tools/src/main/resources/appAssemblerScriptTemplate b/pinot-tools/src/main/resources/appAssemblerScriptTemplate index 359ab9943460..03e7ab7c26b6 100644 --- a/pinot-tools/src/main/resources/appAssemblerScriptTemplate +++ b/pinot-tools/src/main/resources/appAssemblerScriptTemplate @@ -111,6 +111,7 @@ fi # $PLUGINS_DIR and $PLUGINS_INCLUDE are used if $PLUGINS_CLASSPATH is not set. # $PLUGINS_DIR is semi-colon separated list of plugin directories, default to '"$BASEDIR"/plugins' if not set. # $PLUGINS_INCLUDE is semi-colon separated plugins name, e.g. pinot-avro;pinot-batch-ingestion-standalone. Default is not set, which means load all the plugin jars. +# If the same plugin is found in multiple directories, it will only be picked up from the first directory it was found. The directories in $PLUGINS_DIR are traversed by their order. if [ -z "$PLUGINS_CLASSPATH" ] ; then if [ -z "$PLUGINS_DIR" ] ; then PLUGINS_DIR="$BASEDIR"/plugins From 04ec583b4e29610b31951c7eb3a26d400326f2ea Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Wed, 8 Dec 2021 18:32:33 -0500 Subject: [PATCH 10/11] Adding empty tail line --- .../main/java/org/apache/pinot/spi/plugin/PluginManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java index 2c4c69e8d68a..05e8215c85a2 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java @@ -388,4 +388,4 @@ public void registerRecordReaderClass(String inputFormat, String recordReaderCla INPUT_FORMAT_TO_RECORD_READER_CONFIG_CLASS_NAME_MAP.put(inputFormat.toLowerCase(), recordReaderConfigClass); } } -} \ No newline at end of file +} From 471591eddeb9f398cac69e107b3caa65eee4b026 Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Wed, 8 Dec 2021 19:47:50 -0500 Subject: [PATCH 11/11] Format code using style guide --- .../common/utils/TarGzCompressionUtils.java | 8 +++---- .../pinot/spi/plugin/PluginManagerTest.java | 23 +++++++++++-------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java index cd3e5138f5c2..e82d6ebbf7ee 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java @@ -56,7 +56,7 @@ private TarGzCompressionUtils() { */ public static void createTarGzFile(File inputFile, File outputFile) throws IOException { - createTarGzFile(new File[] {inputFile}, outputFile); + createTarGzFile(new File[]{inputFile}, outputFile); } /** @@ -68,9 +68,9 @@ public static void createTarGzFile(File[] inputFiles, File outputFile) Preconditions.checkArgument(outputFile.getName().endsWith(TAR_GZ_FILE_EXTENSION), "Output file: %s does not have '.tar.gz' file extension", outputFile); try (OutputStream fileOut = Files.newOutputStream(outputFile.toPath()); - BufferedOutputStream bufferedOut = new BufferedOutputStream(fileOut); - OutputStream gzipOut = new GzipCompressorOutputStream(bufferedOut); - TarArchiveOutputStream tarGzOut = new TarArchiveOutputStream(gzipOut)) { + BufferedOutputStream bufferedOut = new BufferedOutputStream(fileOut); + OutputStream gzipOut = new GzipCompressorOutputStream(bufferedOut); + TarArchiveOutputStream tarGzOut = new TarArchiveOutputStream(gzipOut)) { tarGzOut.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_STAR); tarGzOut.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU); diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java index 7dfeb09abccb..a45eec374cba 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java @@ -53,22 +53,24 @@ public class PluginManagerTest { private File _p4; @BeforeClass - public void setup() throws IOException { + public void setup() + throws IOException { _tempDir = new File(System.getProperty("java.io.tmpdir"), "pinot-plugin-test"); FileUtils.deleteDirectory(_tempDir); _tempDir.mkdirs(); } @Test - public void testGetPluginsToLoad() throws IOException { + public void testGetPluginsToLoad() + throws IOException { /* We have two plugin directories (../plugins/d1/ and ../plugins/d2/) - * plugins to include = [ p1, p2, p3 ] - * d1 has plugins: p1 - * d2 has plugins: p1, p2, p3, p4 - * We expect d1/p1, d2/p2, d2/p3 to be picked up - * - ensuring second instance of p1 is ignored - * - ensuring p4 is ignored as it's not on the plugins to include list - */ + * plugins to include = [ p1, p2, p3 ] + * d1 has plugins: p1 + * d2 has plugins: p1, p2, p3, p4 + * We expect d1/p1, d2/p2, d2/p3 to be picked up + * - ensuring second instance of p1 is ignored + * - ensuring p4 is ignored as it's not on the plugins to include list + */ String pluginsDirs = _tempDir + "/plugins/d1;" + _tempDir + "/plugins/d2;"; String pluginsToInclude = "p1;p2;p3"; // specifically excluding p3.jar @@ -207,7 +209,8 @@ public void testBackwardCompatible() { } @AfterClass - public void tearDown() throws IOException { + public void tearDown() + throws IOException { FileUtils.deleteDirectory(_tempDir); FileUtils.deleteQuietly(_jarDirFile); }