diff --git a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java index e3912053bede..4b7d9df74387 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java @@ -33,6 +33,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; +import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.TableConfig; @@ -236,27 +237,22 @@ private static String fetchUrl(URL url, String authToken) /** - * Find matching files from root directory specified in fileUri. - * If includePattern and excludePattern are not null, get all the files that match includePattern and exclude files - * that match excludePattern. - * If - * - * @param pinotFs root directly fs - * @param fileUri root directly uri + * @param pinotFs root directory fs + * @param fileUri root directory uri * @param includePattern optional glob patterns for files to include * @param excludePattern optional glob patterns for files to exclude - * @param searchRecrusively if ture, search files recursively from directory specified in fileUri + * @param searchRecursively if ture, search files recursively from directory specified in fileUri * @return list of matching files. * @throws IOException on IO failure for list files in root directory. * @throws URISyntaxException for matching file URIs * @throws RuntimeException if there is no matching file. */ - public static List listMatchedFilesWithRecursiveOption(PinotFS pinotFs, URI fileUri, String includePattern, - String excludePattern, boolean searchRecrusively) + public static List listMatchedFilesWithRecursiveOption(PinotFS pinotFs, URI fileUri, + @Nullable String includePattern, @Nullable String excludePattern, boolean searchRecursively) throws Exception { String[] files; // listFiles throws IOException - files = pinotFs.listFiles(fileUri, searchRecrusively); + files = pinotFs.listFiles(fileUri, searchRecursively); //TODO: sort input files based on creation time PathMatcher includeFilePathMatcher = null; if (includePattern != null) { diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pinot-plugins.tar.gz b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pinot-plugins.tar.gz deleted file mode 100644 index 64b5366c0e2f..000000000000 Binary files a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pinot-plugins.tar.gz and /dev/null differ diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java index fa015be2b858..0ea86376ec9c 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -140,6 +140,14 @@ public void run() for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec)); } + //Get list of files to process + URI inputDirURI = new URI(_spec.getInputDirURI()); + if (inputDirURI.getScheme() == null) { + inputDirURI = new File(_spec.getInputDirURI()).toURI(); + } + PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); + List filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, + _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively()); //Get outputFS for writing output pinot segments URI outputDirURI = new URI(_spec.getOutputDirURI()); @@ -172,15 +180,6 @@ public void run() Path stagingSegmentTarUri = new Path(stagingDirURI.toString(), SEGMENT_TAR_SUBDIR_NAME); outputDirFS.mkdir(stagingSegmentTarUri.toUri()); - //Get list of files to process - URI inputDirURI = new URI(_spec.getInputDirURI()); - if (inputDirURI.getScheme() == null) { - inputDirURI = new File(_spec.getInputDirURI()).toURI(); - } - PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); - List filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, - _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively()); - // numDataFiles is guaranteed to be greater than zero since listMatchedFilesWithRecursiveOption will throw // runtime exception if the matched files list is empty. int numDataFiles = filteredFiles.size(); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index 1f9715d611f6..ceaf2b1b9c5f 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -126,6 +126,15 @@ public void run() for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec)); } + //Get list of files to process + URI inputDirURI = new URI(_spec.getInputDirURI()); + if (inputDirURI.getScheme() == null) { + inputDirURI = new File(_spec.getInputDirURI()).toURI(); + } + PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); + List filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, + _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively()); + LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size()); //Get outputFS for writing output pinot segments URI outputDirURI = new URI(_spec.getOutputDirURI()); if (outputDirURI.getScheme() == null) { @@ -149,16 +158,6 @@ public void run() } outputDirFS.mkdir(stagingDirURI); } - //Get list of files to process - //Get pinotFS for input - URI inputDirURI = new URI(_spec.getInputDirURI()); - if (inputDirURI.getScheme() == null) { - inputDirURI = new File(_spec.getInputDirURI()).toURI(); - } - PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); - List filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, - _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively()); - LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size()); try { JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java index 3b327a4ea145..46db5818a29e 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java @@ -126,6 +126,15 @@ public void run() for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec)); } + //Get list of files to process + URI inputDirURI = new URI(_spec.getInputDirURI()); + if (inputDirURI.getScheme() == null) { + inputDirURI = new File(_spec.getInputDirURI()).toURI(); + } + PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); + List filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, + _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively()); + LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size()); //Get outputFS for writing output pinot segments URI outputDirURI = new URI(_spec.getOutputDirURI()); if (outputDirURI.getScheme() == null) { @@ -149,15 +158,6 @@ public void run() } outputDirFS.mkdir(stagingDirURI); } - //Get list of files to process - URI inputDirURI = new URI(_spec.getInputDirURI()); - if (inputDirURI.getScheme() == null) { - inputDirURI = new File(_spec.getInputDirURI()).toURI(); - } - PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); - List filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, - _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively()); - LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size()); try { JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java index 7b0f22326ae4..a1ab0ad611c6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java @@ -32,7 +32,8 @@ public class BatchConfig { private final FileFormat _inputFormat; private final String _inputDirURI; - private final boolean _searchRecursively; + // TODO: update the default value to false. + private boolean _searchRecursively = true; private final String _inputFsClassName; private final Map _inputFsProps; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java index 9029038cc98b..02e52b26c14f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java @@ -50,6 +50,7 @@ public class SegmentGenerationJobSpec implements Serializable { */ // TODO: set the default value to false after all clients are aware of this. private boolean _searchRecursively = true; + /** * include file name pattern, supported glob pattern. * Sample usage: @@ -173,6 +174,7 @@ public boolean isSearchRecursively() { public void setSearchRecursively(boolean searchRecursively) { _searchRecursively = searchRecursively; } + public String getIncludeFileNamePattern() { return _includeFileNamePattern; }