Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support loading plugins from multiple directories #7871

Merged
merged 11 commits into from
Dec 14, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ private TarGzCompressionUtils() {
*/
public static void createTarGzFile(File inputFile, File outputFile)
throws IOException {
createTarGzFile(new File[]{inputFile}, outputFile);
}

/**
* Creates a tar.gz file from a list of input file/directories to the output file. The output file must have
* ".tar.gz" as the file extension.
*/
public static void createTarGzFile(File[] inputFiles, File outputFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we check and skip the nested paths?
E.g. one directory is a/b/c and one file a/b/c/d.file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiangfu0 can you clarify what you mean, so a/b/c/d.file would be skipped since it's included already via a/b/c?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if the override happens then it's not a problem to worry.

throws IOException {
Preconditions.checkArgument(outputFile.getName().endsWith(TAR_GZ_FILE_EXTENSION),
"Output file: %s does not have '.tar.gz' file extension", outputFile);
try (OutputStream fileOut = Files.newOutputStream(outputFile.toPath());
Expand All @@ -64,7 +73,10 @@ public static void createTarGzFile(File inputFile, File outputFile)
TarArchiveOutputStream tarGzOut = new TarArchiveOutputStream(gzipOut)) {
tarGzOut.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_STAR);
tarGzOut.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
addFileToTarGz(tarGzOut, inputFile, "");

for (File inputFile : inputFiles) {
addFileToTarGz(tarGzOut, inputFile, "");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,57 @@ public void testFile()
assertEquals(FileUtils.readFileToString(untarredFile), fileContent);
}

@Test
public void testDirectories()
throws IOException {
String dirToTarName1 = "dir1";
String dirToTarName2 = "dir2";
File dir1 = new File(DATA_DIR, dirToTarName1);
File dir2 = new File(DATA_DIR, dirToTarName2);

File[] dirsToTar = new File[] {dir1, dir2};

String fileName1 = "data1";
String fileContent1 = "fileContent1";
String fileName2 = "data2";
String fileContent2 = "fileContent2";
FileUtils.write(new File(dir1, fileName1), fileContent1);
FileUtils.write(new File(dir2, fileName2), fileContent2);

String outputTarName = "output_tar" + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION;
File tarGzFile = new File(TAR_DIR, outputTarName);
TarGzCompressionUtils.createTarGzFile(dirsToTar, tarGzFile);

List<File> untarredFiles = TarGzCompressionUtils.untar(tarGzFile, UNTAR_DIR);
assertEquals(untarredFiles.size(), 4);

/*
untarredFiles ends up being a list as follows:
/dir1/
/dir1/data1
/dir2/
/dir2/data2

To fetch the 2 directories we want for the following assertions, we expect them at indexes 0 and 2.
*/
File untarredFileDir1 = untarredFiles.get(0);
File untarredFileDir2 = untarredFiles.get(2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comments on why the 1st and 3rd would be fetched.


assertEquals(untarredFileDir1, new File(UNTAR_DIR, dirToTarName1));
assertEquals(untarredFileDir2, new File(UNTAR_DIR, dirToTarName2));

File[] filesDir1 = untarredFileDir1.listFiles();
assertNotNull(filesDir1);
assertEquals(filesDir1.length, 1);
assertEquals(FileUtils.readFileToString(new File(untarredFileDir1, fileName1)), fileContent1);


File[] filesDir2 = untarredFileDir2.listFiles();
assertNotNull(filesDir2);
assertEquals(filesDir2.length, 1);
assertEquals(FileUtils.readFileToString(new File(untarredFileDir2, fileName2)), fileContent2);
}

@Test
public void testDirectory()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.yaml.snakeyaml.Yaml;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pinot.common.segment.generation.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ;
import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME;


Expand Down Expand Up @@ -390,27 +391,41 @@ protected void addMapperJarToDistributedCache(Job job, PinotFS outputDirFS, URI
}

protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, URI stagingDirURI) {
File pluginsRootDir = new File(PluginManager.get().getPluginsRootDir());
if (pluginsRootDir.exists()) {
try {
File pluginsTarGzFile = File.createTempFile("pinot-plugins-", ".tar.gz");
TarGzCompressionUtils.createTarGzFile(pluginsRootDir, pluginsTarGzFile);

// Copy to staging directory
Path cachedPluginsTarball = new Path(stagingDirURI.toString(), SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ);
outputDirFS.copyFromLocalFile(pluginsTarGzFile, cachedPluginsTarball.toUri());
job.addCacheFile(cachedPluginsTarball.toUri());
} catch (Exception e) {
LOGGER.error("Failed to tar plugins directory and upload to staging dir", e);
throw new RuntimeException(e);
}
String[] pluginDirectories = PluginManager.get().getPluginsDirectories();
if (pluginDirectories == null) {
LOGGER.warn("Plugin directories is null, nothing to pack to distributed cache");
return;
}

String pluginsIncludes = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME);
if (pluginsIncludes != null) {
job.getConfiguration().set(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsIncludes);
ArrayList<File> validPluginDirectories = new ArrayList();

for (String pluginsDirPath : pluginDirectories) {
File pluginsDir = new File(pluginsDirPath);
if (pluginsDir.exists()) {
validPluginDirectories.add(pluginsDir);
} else {
LOGGER.warn("Cannot find Pinot plugins directory at [{}]", pluginsDirPath);
return;
}
} else {
LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", pluginsRootDir);
}

File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
try {
File[] files = validPluginDirectories.toArray(new File[0]);
TarGzCompressionUtils.createTarGzFile(files, pluginsTarGzFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this untar later? If I have

/a/b
/a/c
/b/c

will it come back out the same way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the initial createTarGzFile which accepted 1 File obj, also accepted directories with support for recursion too: so let's say I called createTarGzFile with 1 directory (/a/...) that entire one will be tarred and it's children.

now, using the new method if I call it on let's say [ /a/ and /b/ ], the dir name is used as the baseEntryName (see ln 89 in TarGzCompressionUtils.java).

so effectively, if you tar two directories /a/ and /b/, it should come the same way as you pasted above


// Copy to staging directory
Path cachedPluginsTarball = new Path(stagingDirURI.toString(), SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ);
outputDirFS.copyFromLocalFile(pluginsTarGzFile, cachedPluginsTarball.toUri());
job.addCacheFile(cachedPluginsTarball.toUri());
} catch (Exception e) {
LOGGER.error("Failed to tar plugins directories and upload to staging dir", e);
throw new RuntimeException(e);
}

String pluginsIncludes = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME);
if (pluginsIncludes != null) {
job.getConfiguration().set(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsIncludes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,26 +374,35 @@ protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext, Strin
}

protected void packPluginsToDistributedCache(JavaSparkContext sparkContext) {
String pluginsRootDirPath = PluginManager.get().getPluginsRootDir();
if (pluginsRootDirPath == null) {
LOGGER.warn("Local Pinot plugins directory is null, skip packaging...");
String[] pluginDirectories = PluginManager.get().getPluginsDirectories();
if (pluginDirectories == null) {
LOGGER.warn("Plugin directories is null, skipping packaging...");
return;
}
File pluginsRootDir = new File(pluginsRootDirPath);
if (pluginsRootDir.exists()) {
File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
try {
TarGzCompressionUtils.createTarGzFile(pluginsRootDir, pluginsTarGzFile);
} catch (IOException e) {
LOGGER.error("Failed to tar plugins directory", e);
}
sparkContext.addFile(pluginsTarGzFile.getAbsolutePath());
String pluginsIncludes = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME);
if (pluginsIncludes != null) {
sparkContext.getConf().set(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsIncludes);

ArrayList<File> validPluginDirectories = new ArrayList();

for (String pluginsDirPath : pluginDirectories) {
File pluginsDir = new File(pluginsDirPath);
if (pluginsDir.exists()) {
validPluginDirectories.add(pluginsDir);
} else {
LOGGER.warn("Cannot find Pinot plugins directory at [{}]", pluginsDirPath);
return;
}
} else {
LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", pluginsRootDirPath);
}

File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
try {
File[] files = validPluginDirectories.toArray(new File[0]);
TarGzCompressionUtils.createTarGzFile(files, pluginsTarGzFile);
} catch (IOException e) {
LOGGER.error("Failed to tar plugins directories", e);
}
sparkContext.addFile(pluginsTarGzFile.getAbsolutePath());
String pluginsIncludes = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME);
if (pluginsIncludes != null) {
sparkContext.getConf().set(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsIncludes);
}
}
}
118 changes: 84 additions & 34 deletions pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.spi.plugin;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.lang.reflect.Constructor;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -110,7 +111,7 @@ public class PluginManager {
};

private Map<Plugin, PluginClassLoader> _registry = new HashMap<>();
private String _pluginsRootDir;
private String _pluginsDirectories;
private String _pluginsInclude;
private boolean _initialized = false;

Expand All @@ -124,58 +125,104 @@ public synchronized void init() {
return;
}
try {
_pluginsRootDir = System.getProperty(PLUGINS_DIR_PROPERTY_NAME);
_pluginsDirectories = System.getProperty(PLUGINS_DIR_PROPERTY_NAME);
} catch (Exception e) {
LOGGER.error("Failed to load env variable {}", PLUGINS_DIR_PROPERTY_NAME, e);
_pluginsRootDir = null;
_pluginsDirectories = null;
}
try {
_pluginsInclude = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME);
} catch (Exception e) {
LOGGER.error("Failed to load env variable {}", PLUGINS_INCLUDE_PROPERTY_NAME, e);
_pluginsInclude = null;
}
init(_pluginsRootDir, _pluginsInclude);
init(_pluginsDirectories, _pluginsInclude);
_initialized = true;
}

private void init(String pluginsRootDir, String pluginsInclude) {
if (StringUtils.isEmpty(pluginsRootDir)) {
private void init(String pluginsDirectories, String pluginsInclude) {
if (StringUtils.isEmpty(pluginsDirectories)) {
LOGGER.info("Env variable '{}' is not specified. Set this env variable to load additional plugins.",
PLUGINS_DIR_PROPERTY_NAME);
return;
} else {
if (!new File(pluginsRootDir).exists()) {
LOGGER.warn("Plugins root dir [{}] doesn't exist.", pluginsRootDir);
return;
try {
HashMap<String, File> plugins = getPluginsToLoad(pluginsDirectories, pluginsInclude);
LOGGER.info("#getPluginsToLoad has produced {} plugins to load", plugins.size());

for (Map.Entry<String, File> entry : plugins.entrySet()) {
String pluginName = entry.getKey();
File pluginDir = entry.getValue();

try {
load(pluginName, pluginDir);
LOGGER.info("Successfully Loaded plugin [{}] from dir [{}]", pluginName, pluginDir);
} catch (Exception e) {
LOGGER.error("Failed to load plugin [{}] from dir [{}]", pluginName, pluginDir, e);
}
}

initRecordReaderClassMap();
} catch (IllegalArgumentException e) {
LOGGER.warn(e.getMessage());
}
LOGGER.info("Plugins root dir is [{}]", pluginsRootDir);
}
Collection<File> jarFiles = FileUtils.listFiles(new File(pluginsRootDir), new String[]{JAR_FILE_EXTENSION}, true);
List<String> pluginsToLoad = null;
if (!StringUtils.isEmpty(pluginsInclude)) {
pluginsToLoad = Arrays.asList(pluginsInclude.split(","));
LOGGER.info("Trying to load plugins: [{}]", Arrays.toString(pluginsToLoad.toArray()));
} else {
LOGGER.info("Please use env variable '{}' to customize plugins to load. Loading all plugins: {}",
PLUGINS_INCLUDE_PROPERTY_NAME, Arrays.toString(jarFiles.toArray()));
}
for (File jarFile : jarFiles) {
File pluginDir = jarFile.getParentFile();
String pluginName = pluginDir.getName();
if (pluginsToLoad != null) {
if (!pluginsToLoad.contains(pluginName)) {
continue;
}
}

/**
* This method will take a semi-colon delimited string of directories and a semi-colon delimited string of plugin
* names. It will traverse the directories in order and produce a <String, File> map of plugins to be loaded.
* If a plugin is found in multiple directories, only the first copy of it will be picked up.
* @param pluginsDirectories
* @param pluginsInclude
* @return A hash map with key = plugin name, value = file object
*/
@VisibleForTesting
public HashMap<String, File> getPluginsToLoad(String pluginsDirectories, String pluginsInclude) throws
IllegalArgumentException {
String[] directories = pluginsDirectories.split(";");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it is more common to use , as the array separator (e.g. in Apache commons configuration). Is there some special reason why picking ; as the separator here?

(minor) Use StringUtils.split(pluginsDirectories, ',') for slightly better performance (avoid regex checking)

Copy link
Contributor Author

@priyen priyen Dec 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used ; as the plugin to include property also used it to be consistent, but I am fine with changing it to ,, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to priyen. plugins themselves use ;. and most PATH-like things use ; as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that the pluginsInclude separator is changed from ',' to ';' in this PR which can cause backward incompatibility. Does it make sense to allow both as separator?

Copy link
Contributor Author

@priyen priyen Dec 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit odd, the master bash code in pinot-tools/src/main/resources/appAssemblerScriptTemplate is using export IFS=";" when looping through $PLUGINS_INCLUDE ..now I'm wondering if pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java ln 157 pluginsToLoad = Arrays.asList(pluginsInclude.split(",")); in master was even working in the first place..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiangfu0 Can you please take a look?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

friendly ping, any ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now Pinot uses appAssemblerScriptTemplate to set all the plugins into java classpath.
So far let's use semi-colon to make the delimiter.

For PluginManager.java, we should also follow the same delimiter convention to use semi-colon. For pluginInclude, we can make the colon as backward compatible.

LOGGER.info("Plugin directories env: {}, parsed directories to load: '{}'", pluginsDirectories, directories);

HashMap<String, File> finalPluginsToLoad = new HashMap<>();

for (String pluginsDirectory : directories) {
if (!new File(pluginsDirectory).exists()) {
throw new IllegalArgumentException(String.format("Plugins dir [{}] doesn't exist.", pluginsDirectory));
}
try {
load(pluginName, pluginDir);
LOGGER.info("Successfully Loaded plugin [{}] from dir [{}]", pluginName, pluginDir);
} catch (Exception e) {
LOGGER.error("Failed to load plugin [{}] from dir [{}]", pluginName, pluginDir, e);

Collection<File> jarFiles = FileUtils.listFiles(
new File(pluginsDirectory),
new String[]{JAR_FILE_EXTENSION},
true
);
List<String> pluginsToLoad = null;
if (!StringUtils.isEmpty(pluginsInclude)) {
pluginsToLoad = Arrays.asList(pluginsInclude.split(";"));
LOGGER.info("Potential plugins to load: [{}]", Arrays.toString(pluginsToLoad.toArray()));
} else {
LOGGER.info("Please use env variable '{}' to customize plugins to load. Loading all plugins: {}",
PLUGINS_INCLUDE_PROPERTY_NAME, Arrays.toString(jarFiles.toArray()));
}

for (File jarFile : jarFiles) {
File pluginDir = jarFile.getParentFile();
String pluginName = pluginDir.getName();
LOGGER.info("Found plugin, pluginDir: {}, pluginName: {}", pluginDir, pluginName);
if (pluginsToLoad != null) {
if (!pluginsToLoad.contains(pluginName)) {
LOGGER.info("Skipping plugin: {} is not inside pluginsToLoad {}", pluginName, pluginsToLoad);
continue;
}
}

if (!finalPluginsToLoad.containsKey(pluginName)) {
finalPluginsToLoad.put(pluginName, pluginDir);
LOGGER.info("Added [{}] from dir [{}] to final list of plugins to load", pluginName, pluginDir);
}
}
}
initRecordReaderClassMap();

return finalPluginsToLoad;
}

private void initRecordReaderClassMap() {
Expand Down Expand Up @@ -312,8 +359,11 @@ public <T> T createInstance(String pluginName, String className, Class[] argType
return (T) instance;
}

public String getPluginsRootDir() {
return _pluginsRootDir;
public String[] getPluginsDirectories() {
if (_pluginsDirectories != null) {
return _pluginsDirectories.split(";");
}
return null;
}

public static PluginManager get() {
Expand Down
Loading