Skip to content

Commit

Permalink
get input files before output directory
Browse files Browse the repository at this point in the history
  • Loading branch information
Yao Liu committed Aug 24, 2022
1 parent 59ad7f1 commit 7e2cc0c
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.config.table.TableConfig;
Expand Down Expand Up @@ -236,27 +237,22 @@ private static String fetchUrl(URL url, String authToken)


/**
* Find matching files from root directory specified in fileUri.
* If includePattern and excludePattern are not null, get all the files that match includePattern and exclude files
* that match excludePattern.
* If
*
* @param pinotFs root directly fs
* @param fileUri root directly uri
* @param pinotFs root directory fs
* @param fileUri root directory uri
* @param includePattern optional glob patterns for files to include
* @param excludePattern optional glob patterns for files to exclude
* @param searchRecrusively if ture, search files recursively from directory specified in fileUri
* @param searchRecursively if ture, search files recursively from directory specified in fileUri
* @return list of matching files.
* @throws IOException on IO failure for list files in root directory.
* @throws URISyntaxException for matching file URIs
* @throws RuntimeException if there is no matching file.
*/
public static List<String> listMatchedFilesWithRecursiveOption(PinotFS pinotFs, URI fileUri, String includePattern,
String excludePattern, boolean searchRecrusively)
public static List<String> listMatchedFilesWithRecursiveOption(PinotFS pinotFs, URI fileUri,
@Nullable String includePattern, @Nullable String excludePattern, boolean searchRecursively)
throws Exception {
String[] files;
// listFiles throws IOException
files = pinotFs.listFiles(fileUri, searchRecrusively);
files = pinotFs.listFiles(fileUri, searchRecursively);
//TODO: sort input files based on creation time
PathMatcher includeFilePathMatcher = null;
if (includePattern != null) {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ public void run()
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}
//Get list of files to process
URI inputDirURI = new URI(_spec.getInputDirURI());
if (inputDirURI.getScheme() == null) {
inputDirURI = new File(_spec.getInputDirURI()).toURI();
}
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI,
_spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively());

//Get outputFS for writing output pinot segments
URI outputDirURI = new URI(_spec.getOutputDirURI());
Expand Down Expand Up @@ -172,15 +180,6 @@ public void run()
Path stagingSegmentTarUri = new Path(stagingDirURI.toString(), SEGMENT_TAR_SUBDIR_NAME);
outputDirFS.mkdir(stagingSegmentTarUri.toUri());

//Get list of files to process
URI inputDirURI = new URI(_spec.getInputDirURI());
if (inputDirURI.getScheme() == null) {
inputDirURI = new File(_spec.getInputDirURI()).toURI();
}
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI,
_spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively());

// numDataFiles is guaranteed to be greater than zero since listMatchedFilesWithRecursiveOption will throw
// runtime exception if the matched files list is empty.
int numDataFiles = filteredFiles.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ public void run()
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}
//Get list of files to process
URI inputDirURI = new URI(_spec.getInputDirURI());
if (inputDirURI.getScheme() == null) {
inputDirURI = new File(_spec.getInputDirURI()).toURI();
}
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI,
_spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively());
LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size());
//Get outputFS for writing output pinot segments
URI outputDirURI = new URI(_spec.getOutputDirURI());
if (outputDirURI.getScheme() == null) {
Expand All @@ -149,16 +158,6 @@ public void run()
}
outputDirFS.mkdir(stagingDirURI);
}
//Get list of files to process
//Get pinotFS for input
URI inputDirURI = new URI(_spec.getInputDirURI());
if (inputDirURI.getScheme() == null) {
inputDirURI = new File(_spec.getInputDirURI()).toURI();
}
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI,
_spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively());
LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size());
try {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ public void run()
for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}
//Get list of files to process
URI inputDirURI = new URI(_spec.getInputDirURI());
if (inputDirURI.getScheme() == null) {
inputDirURI = new File(_spec.getInputDirURI()).toURI();
}
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI,
_spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively());
LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size());
//Get outputFS for writing output pinot segments
URI outputDirURI = new URI(_spec.getOutputDirURI());
if (outputDirURI.getScheme() == null) {
Expand All @@ -149,15 +158,6 @@ public void run()
}
outputDirFS.mkdir(stagingDirURI);
}
//Get list of files to process
URI inputDirURI = new URI(_spec.getInputDirURI());
if (inputDirURI.getScheme() == null) {
inputDirURI = new File(_spec.getInputDirURI()).toURI();
}
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI,
_spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively());
LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size());
try {
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public class BatchConfig {

private final FileFormat _inputFormat;
private final String _inputDirURI;
private final boolean _searchRecursively;
// TODO: update the default value to false.
private boolean _searchRecursively = true;
private final String _inputFsClassName;
private final Map<String, String> _inputFsProps;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class SegmentGenerationJobSpec implements Serializable {
*/
// TODO: set the default value to false after all clients are aware of this.
private boolean _searchRecursively = true;

/**
* include file name pattern, supported glob pattern.
* Sample usage:
Expand Down Expand Up @@ -173,6 +174,7 @@ public boolean isSearchRecursively() {
public void setSearchRecursively(boolean searchRecursively) {
_searchRecursively = searchRecursively;
}

public String getIncludeFileNamePattern() {
return _includeFileNamePattern;
}
Expand Down

0 comments on commit 7e2cc0c

Please sign in to comment.