Skip to content

Commit

Permalink
Disable recursion in PinotFS copy (#8162)
Browse files Browse the repository at this point in the history
* Disable recursion in PinotFS copy

All uses of PinotFS copy API involve copying a tarred segment and
untarring it. So, copying a directory recursively will not work (the
untar will fail). It also results in wastage of effort in copying
across file systems.

Also disabled the file scheme in during segment upload on the controller,
since the URL based upload is meant to provide an external URL to be
picked up by the controller.

* Removed redundant comment

* Fix to add a different API for recursive copy

* Fix lint errors
  • Loading branch information
mcvsubbu authored Feb 8, 2022
1 parent d479ebc commit 1382d29
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public static void fetchSegmentToLocal(String uri, File dest)

private void fetchSegmentToLocalInternal(URI uri, File dest)
throws Exception {
// caller untars
getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,11 @@ private void downloadSegmentFileFromURI(String currentSegmentLocationURI, File d
}
LOGGER.info("Downloading segment from {} to {} for table {}", currentSegmentLocationURI, destFile.getAbsolutePath(),
tableName);
URI uri = new URI(currentSegmentLocationURI);
if (uri.getScheme().equalsIgnoreCase("file")) {
throw new ControllerApplicationException(LOGGER, "Unsupported URI: " + currentSegmentLocationURI,
Response.Status.BAD_REQUEST);
}
SegmentFetcherFactory.fetchSegmentToLocal(currentSegmentLocationURI, destFile);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void run()
job.getConfiguration().setBoolean(MRJobConfig.MAP_SPECULATIVE, false);

// But we have to copy ourselves to HDFS, and add us to the distributed cache, so
// that the mapper code is available.
// that the mapper code is available.
addMapperJarToDistributedCache(job, outputDirFS, stagingDirURI);

org.apache.hadoop.conf.Configuration jobConf = job.getConfiguration();
Expand All @@ -277,7 +277,7 @@ public void run()
// In order to ensure pinot plugins would be loaded to each worker, this method
// tars entire plugins directory and set this file into Distributed cache.
// Then each mapper job will untar the plugin tarball, and set system properties accordingly.
// Note that normally we'd just use Hadoop's support for putting jars on the
// Note that normally we'd just use Hadoop's support for putting jars on the
// classpath via the distributed cache, but some of the plugins (e.g. the pinot-parquet
// input format) include Hadoop classes, which can be incompatibile with the Hadoop
// installation/jars being used to run the mapper, leading to errors such as:
Expand Down Expand Up @@ -386,7 +386,7 @@ protected void addMapperJarToDistributedCache(Job job, PinotFS outputDirFS, URI
throws Exception {
File ourJar = new File(getClass().getProtectionDomain().getCodeSource().getLocation().toURI());
Path distributedCacheJar = new Path(stagingDirURI.toString(), ourJar.getName());
outputDirFS.copyFromLocalFile(ourJar, distributedCacheJar.toUri());
outputDirFS.copyFromLocalDir(ourJar, distributedCacheJar.toUri());
job.addFileToClassPath(distributedCacheJar);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ public void copyToLocalFile(URI srcUri, File dstFile)
if (_hadoopFS == null) {
throw new RuntimeException("_hadoopFS client is not initialized when trying to copy files");
}
if (_hadoopFS.isDirectory(remoteFile)) {
throw new IllegalArgumentException(srcUri.toString() + " is a direactory");
}
long startMs = System.currentTimeMillis();
_hadoopFS.copyToLocalFile(remoteFile, localFile);
LOGGER.debug("copied {} from hdfs to {} in local for size {}, take {} ms", srcUri, dstFilePath, dstFile.length(),
Expand All @@ -196,9 +199,21 @@ public void copyToLocalFile(URI srcUri, File dstFile)
@Override
public void copyFromLocalFile(File srcFile, URI dstUri)
throws Exception {
if (srcFile.isDirectory()) {
throw new IllegalArgumentException(srcFile.getAbsolutePath() + " is a direactory");
}
_hadoopFS.copyFromLocalFile(new Path(srcFile.toURI()), new Path(dstUri));
}

public void copyFromLocalDir(File srcFile, URI dstUri)
throws Exception {
Path srcPath = new Path(srcFile.toURI());
if (!_hadoopFS.isDirectory(srcPath)) {
throw new IllegalArgumentException(srcFile.getAbsolutePath() + " is not a directory");
}
_hadoopFS.copyFromLocalFile(srcPath, new Path(dstUri));
}

@Override
public boolean isDirectory(URI uri) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public boolean doMove(URI srcUri, URI dstUri)
@Override
public boolean copy(URI srcUri, URI dstUri)
throws IOException {
copy(toFile(srcUri), toFile(dstUri));
copy(toFile(srcUri), toFile(dstUri), false);
return true;
}

Expand Down Expand Up @@ -118,13 +118,22 @@ public String[] listFiles(URI fileUri, boolean recursive)
@Override
public void copyToLocalFile(URI srcUri, File dstFile)
throws Exception {
copy(toFile(srcUri), dstFile);
copy(toFile(srcUri), dstFile, false);
}

@Override
public void copyFromLocalFile(File srcFile, URI dstUri)
throws Exception {
copy(srcFile, toFile(dstUri));
copy(srcFile, toFile(dstUri), false);
}

@Override
public void copyFromLocalDir(File srcFile, URI dstUri)
throws Exception {
if (!srcFile.isDirectory()) {
throw new IllegalArgumentException(srcFile.getAbsolutePath() + " is not a directory");
}
copy(srcFile, toFile(dstUri), true);
}

@Override
Expand Down Expand Up @@ -163,14 +172,18 @@ private static File toFile(URI uri) {
}
}

private static void copy(File srcFile, File dstFile)
private static void copy(File srcFile, File dstFile, boolean recursive)
throws IOException {
if (dstFile.exists()) {
FileUtils.deleteQuietly(dstFile);
}
if (srcFile.isDirectory()) {
// Throws Exception on failure
FileUtils.copyDirectory(srcFile, dstFile);
if (recursive) {
FileUtils.copyDirectory(srcFile, dstFile);
} else {
// Throws Exception on failure
throw new IOException(srcFile.getAbsolutePath() + " is a directory");
}
} else {
// Will create parent directories, throws Exception on failure
FileUtils.copyFile(srcFile, dstFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,29 @@ String[] listFiles(URI fileUri, boolean recursive)

/**
* Copies a file from a remote filesystem to the local one. Keeps the original file.
* @param srcUri location of current file on remote filesystem
* @param srcUri location of current file on remote filesystem (must not be a directory)
* @param dstFile location of destination on local filesystem
* @throws Exception if srcUri is not valid or not present, or timeout when downloading file to local
*/
void copyToLocalFile(URI srcUri, File dstFile)
throws Exception;

/**
* @apiNote This API is to be used with caution, since recursive copies can lead to adverse situations.
*
* Add srcFile to filesystem at the given dst name and the source is kept intact afterwards.
* @param srcFile location of src file on local disk (must be a directory)
* @param dstUri location of dst on remote filesystem
* @throws Exception if fileUri is not valid or not present, or timeout when uploading file from local
*/
default void copyFromLocalDir(File srcFile, URI dstUri)
throws Exception {
throw new UnsupportedOperationException("Recursive copy not supported");
}
/**
* The src file is on the local disk. Add it to filesystem at the given dst name and the source is kept intact
* afterwards.
* @param srcFile location of src file on local disk
* @param srcFile location of src file on local disk (must not be a directory)
* @param dstUri location of dst on remote filesystem
* @throws Exception if fileUri is not valid or not present, or timeout when uploading file from local
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,41 +205,12 @@ public void testFS()
// Expected.
}

// Check that directory only copy worked
localPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
Assert.assertTrue(localPinotFS.exists(secondTempDir.toURI()));

// Copying directory with files to directory with files
File testFile = new File(firstTempDir, "testFile");
Assert.assertTrue(testFile.createNewFile(), "Could not create file " + testFile.getPath());
File newTestFile = new File(secondTempDir, "newTestFile");
Assert.assertTrue(newTestFile.createNewFile(), "Could not create file " + newTestFile.getPath());

localPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), true).length, 1);

// Copying directory with files under another directory.
File firstTempDirUnderSecondTempDir = new File(secondTempDir, firstTempDir.getName());
localPinotFS.copy(firstTempDir.toURI(), firstTempDirUnderSecondTempDir.toURI());
Assert.assertTrue(localPinotFS.exists(firstTempDirUnderSecondTempDir.toURI()));
// There're two files/directories under secondTempDir.
Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), false).length, 2);
// The file under src directory also got copied under dst directory.
Assert.assertEquals(localPinotFS.listFiles(firstTempDirUnderSecondTempDir.toURI(), true).length, 1);

// len of dir = exception
try {
localPinotFS.length(firstTempDir.toURI());
fail();
} catch (IllegalArgumentException e) {

}

Assert.assertTrue(testFile.exists());

localPinotFS.copyFromLocalFile(testFile, secondTestFileUri);
Assert.assertTrue(localPinotFS.exists(secondTestFileUri));
localPinotFS.copyToLocalFile(testFile.toURI(), new File(secondTestFileUri));
Assert.assertTrue(localPinotFS.exists(secondTestFileUri));
}
}

0 comments on commit 1382d29

Please sign in to comment.