Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add copy recursive API to pinotFS #8200

Merged
merged 4 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,10 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
try {
return _localPinotFS.copy(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
return _localPinotFS.copyDir(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public void call(String pathAndIdx)
if (stagingDirURI != null) {
LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
outputDirURI);
outputDirFS.copy(stagingDirURI, outputDirURI);
outputDirFS.copyDir(stagingDirURI, outputDirURI);
}
} finally {
if (stagingDirURI != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public boolean doMove(URI srcUri, URI dstUri)
* @return true if move succeeds else false.
*/
@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
LOGGER.debug("copy is called with srcUri='{}', dstUri='{}'", srcUri, dstUri);
// If src and dst are the same, do nothing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,13 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
// TODO: support directory copy.
if (isDirectory(srcUri)) {
throw new UnsupportedOperationException("Azure FS doesn't support directory recursive copy!");
}

if (exists(dstUri)) {
delete(dstUri, true);
}
Expand All @@ -126,9 +131,8 @@ public boolean copy(URI srcUri, URI dstUri)
inputStream.close();
outputStream.close();
} catch (IOException e) {
LOGGER
.error("Exception encountered during copy, input: '{}', output: '{}'.", srcUri.toString(), dstUri.toString(),
e);
LOGGER.error("Exception encountered during copy, input: '{}', output: '{}'.", srcUri, dstUri, e);
return false;
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ public boolean doMove(URI srcUri, URI dstUri)
GcsUri srcGcsUri = new GcsUri(srcUri);
GcsUri dstGcsUri = new GcsUri(dstUri);
if (copy(srcGcsUri, dstGcsUri)) {
// Only delete if all files were successfully moved
// Only delete if all files were successfully copied
return delete(srcGcsUri, true);
}
return false;
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
LOGGER.info("Copying uri {} to uri {}", srcUri, dstUri);
return copy(new GcsUri(srcUri), new GcsUri(dstUri));
Expand Down Expand Up @@ -414,9 +414,11 @@ private boolean copy(GcsUri srcUri, GcsUri dstUri)
if (srcUri.equals(dstUri)) {
return true;
}
// copy directly if source is a single file.
if (!existsDirectory(srcUri)) {
return copyFile(srcUri, dstUri);
}
// copy directory
if (srcUri.hasSubpath(dstUri) || dstUri.hasSubpath(srcUri)) {
throw new IOException(String.format("Cannot copy from or to a subdirectory: '%s' -> '%s'", srcUri, dstUri));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void testGcs()

// Test copy directory -> directory
GcsUri gcsDirectoryUriCopy = createTempDirectoryGcsUri();
_pinotFS.copy(gcsDirectoryUri.getUri(), gcsDirectoryUriCopy.getUri());
_pinotFS.copyDir(gcsDirectoryUri.getUri(), gcsDirectoryUriCopy.getUri());

Set<GcsUri> expectedElementsCopy = new HashSet<>();
String directoryName = Paths.get(gcsDirectoryUri.getPath()).getFileName().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public boolean doMove(URI srcUri, URI dstUri)
* need to create a new configuration and filesystem. Keeps files if copy/move is partial.
*/
@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
Path source = new Path(srcUri);
Path target = new Path(dstUri);
Expand All @@ -115,7 +115,7 @@ public boolean copy(URI srcUri, URI dstUri)
}
} else if (sourceFile.isDirectory()) {
try {
copy(sourceFilePath.toUri(), new Path(target, sourceFilePath.getName()).toUri());
copyDir(sourceFilePath.toUri(), new Path(target, sourceFilePath.getName()).toUri());
} catch (FileNotFoundException e) {
LOGGER.warn("Not found directory {}, skipping copying it...", sourceFilePath, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testCopy()
hadoopFS.touch(new Path(baseURI.getPath(), "src/dir/2").toUri());
String[] srcFiles = hadoopFS.listFiles(new Path(baseURI.getPath(), "src").toUri(), true);
Assert.assertEquals(srcFiles.length, 3);
hadoopFS.copy(new Path(baseURI.getPath(), "src").toUri(), new Path(baseURI.getPath(), "dest").toUri());
hadoopFS.copyDir(new Path(baseURI.getPath(), "src").toUri(), new Path(baseURI.getPath(), "dest").toUri());
Assert.assertTrue(hadoopFS.exists(new Path(baseURI.getPath(), "dest").toUri()));
Assert.assertTrue(hadoopFS.exists(new Path(baseURI.getPath(), "dest/dir").toUri()));
Assert.assertTrue(hadoopFS.exists(new Path(baseURI.getPath(), "dest/dir/1").toUri()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,14 @@ public boolean delete(URI segmentUri, boolean forceDelete)
@Override
public boolean doMove(URI srcUri, URI dstUri)
throws IOException {
if (copy(srcUri, dstUri)) {
if (copyDir(srcUri, dstUri)) {
return delete(srcUri, true);
}
return false;
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
LOGGER.info("Copying uri {} to uri {}", srcUri, dstUri);
Preconditions.checkState(exists(srcUri), "Source URI '%s' does not exist", srcUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,11 @@ public boolean move(URI srcUri, URI dstUri, boolean overwrite)
}
return doMove(srcUri, dstUri);
}

/**
* Actual move implementation for each PinotFS. It should not be directly called, instead use
* {@link PinotFS#move(URI, URI, boolean)}.
*/
protected abstract boolean doMove(URI srcUri, URI dstUri)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
copy(toFile(srcUri), toFile(dstUri), false);
copy(toFile(srcUri), toFile(dstUri), true);
return true;
}

Expand Down Expand Up @@ -182,7 +182,7 @@ private static void copy(File srcFile, File dstFile, boolean recursive)
FileUtils.copyDirectory(srcFile, dstFile);
} else {
// Throws Exception on failure
throw new IOException(srcFile.getAbsolutePath() + " is a directory");
throw new IOException(srcFile.getAbsolutePath() + " is a directory and recursive copy is not enabled.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying the error message

}
} else {
// Will create parent directories, throws Exception on failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,25 @@ boolean move(URI srcUri, URI dstUri, boolean overwrite)
throws IOException;

/**
* Does the actual behavior of move in each FS.
* Copies the file from the src to dst. The original file is retained. If the dst has parent directories
* that haven't been created, this method will create all the necessary parent directories. If dst already exists,
* this will overwrite the existing file in the path.
*
* Note: In Pinot we recommend the full paths of both src and dst be specified.
* For example, if a file /a/b/c is copied to a file /x/y/z, the directory /a/b still exists containing the file 'c'.
* The dst file /x/y/z will contain the contents of 'c'.
* @param srcUri URI of the original file
* @param dstUri URI of the final file location
* @return true if copy is successful
* @throws IOException on IO failure
*/
boolean doMove(URI srcUri, URI dstUri)
throws IOException;
default boolean copy(URI srcUri, URI dstUri)
throws IOException {
if (isDirectory(srcUri)) {
throw new IllegalArgumentException("Recursive copy not supported");
}
return copyDir(srcUri, dstUri);
}

/**
* Copies the file or directory from the src to dst. The original file is retained. If the dst has parent directories
Expand All @@ -108,7 +123,7 @@ boolean doMove(URI srcUri, URI dstUri)
* @return true if copy is successful
* @throws IOException on IO failure
*/
boolean copy(URI srcUri, URI dstUri)
boolean copyDir(URI srcUri, URI dstUri)
throws IOException;

/**
Expand Down Expand Up @@ -162,6 +177,7 @@ default void copyFromLocalDir(File srcFile, URI dstUri)
throws Exception {
throw new UnsupportedOperationException("Recursive copy not supported");
}

/**
* The src file is on the local disk. Add it to filesystem at the given dst name and the source is kept intact
* afterwards.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,49 @@ public void testFS()
// Expected.
}

// Check that directory only copy doesn't work with default 'copy'
try {
localPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
fail();
} catch (IllegalArgumentException e) {
// expected.
}

// Copying directory works with 'copyDir'
localPinotFS.copyDir(firstTempDir.toURI(), secondTempDir.toURI());
Assert.assertTrue(localPinotFS.exists(secondTempDir.toURI()));

// Copying directory with files to directory with files
File testFile = new File(firstTempDir, "testFile");
Assert.assertTrue(testFile.createNewFile(), "Could not create file " + testFile.getPath());
File newTestFile = new File(secondTempDir, "newTestFile");
Assert.assertTrue(newTestFile.createNewFile(), "Could not create file " + newTestFile.getPath());

localPinotFS.copyDir(firstTempDir.toURI(), secondTempDir.toURI());
Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), false).length, 1);

// Copying directory with files under another directory.
File firstTempDirUnderSecondTempDir = new File(secondTempDir, firstTempDir.getName());
localPinotFS.copyDir(firstTempDir.toURI(), firstTempDirUnderSecondTempDir.toURI());
Assert.assertTrue(localPinotFS.exists(firstTempDirUnderSecondTempDir.toURI()));
// There're two files/directories under secondTempDir.
Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), false).length, 2);
// The file under src directory also got copied under dst directory.
Assert.assertEquals(localPinotFS.listFiles(firstTempDirUnderSecondTempDir.toURI(), true).length, 1);

// len of dir = exception
try {
localPinotFS.length(firstTempDir.toURI());
fail();
} catch (IllegalArgumentException e) {
// expected.
}

Assert.assertTrue(testFile.exists());

localPinotFS.copyFromLocalFile(testFile, secondTestFileUri);
Assert.assertTrue(localPinotFS.exists(secondTestFileUri));
localPinotFS.copyToLocalFile(testFile.toURI(), new File(secondTestFileUri));
Assert.assertTrue(localPinotFS.exists(secondTestFileUri));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
return false;
}
Expand Down