Skip to content

Commit

Permalink
add copyDir API to copy directory within a PinotFS
Browse files Browse the repository at this point in the history
  • Loading branch information
Rong Rong committed Feb 18, 2022
1 parent e7f806b commit b20d06d
Show file tree
Hide file tree
Showing 15 changed files with 89 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public boolean doMove(URI srcUri, URI dstUri)
public boolean copy(URI srcUri, URI dstUri)
throws IOException {
try {
return _localPinotFS.copy(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
return _localPinotFS.copyDir(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath()));
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public boolean doMove(URI srcUri, URI dstUri)
* @return true if move succeeds else false.
*/
@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
LOGGER.debug("copy is called with srcUri='{}', dstUri='{}'", srcUri, dstUri);
// If src and dst are the same, do nothing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
if (isDirectory(srcUri)) {
throw new UnsupportedOperationException("Azure FS doesn't support directory recursive copy!");
}

if (exists(dstUri)) {
delete(dstUri, true);
}
Expand All @@ -126,9 +130,8 @@ public boolean copy(URI srcUri, URI dstUri)
inputStream.close();
outputStream.close();
} catch (IOException e) {
LOGGER
.error("Exception encountered during copy, input: '{}', output: '{}'.", srcUri.toString(), dstUri.toString(),
e);
LOGGER.error("Exception encountered during copy, input: '{}', output: '{}'.", srcUri, dstUri, e);
return false;
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ public boolean doMove(URI srcUri, URI dstUri)
GcsUri srcGcsUri = new GcsUri(srcUri);
GcsUri dstGcsUri = new GcsUri(dstUri);
if (copy(srcGcsUri, dstGcsUri)) {
// Only delete if all files were successfully moved
// Only delete if all files were successfully copied
return delete(srcGcsUri, true);
}
return false;
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
LOGGER.info("Copying uri {} to uri {}", srcUri, dstUri);
return copy(new GcsUri(srcUri), new GcsUri(dstUri));
Expand Down Expand Up @@ -414,9 +414,11 @@ private boolean copy(GcsUri srcUri, GcsUri dstUri)
if (srcUri.equals(dstUri)) {
return true;
}
// copy directly if source is a single file.
if (!existsDirectory(srcUri)) {
return copyFile(srcUri, dstUri);
}
// copy directory
if (srcUri.hasSubpath(dstUri) || dstUri.hasSubpath(srcUri)) {
throw new IOException(String.format("Cannot copy from or to a subdirectory: '%s' -> '%s'", srcUri, dstUri));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void testGcs()

// Test copy directory -> directory
GcsUri gcsDirectoryUriCopy = createTempDirectoryGcsUri();
_pinotFS.copy(gcsDirectoryUri.getUri(), gcsDirectoryUriCopy.getUri());
_pinotFS.copyDir(gcsDirectoryUri.getUri(), gcsDirectoryUriCopy.getUri());

Set<GcsUri> expectedElementsCopy = new HashSet<>();
String directoryName = Paths.get(gcsDirectoryUri.getPath()).getFileName().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public boolean doMove(URI srcUri, URI dstUri)
* need to create a new configuration and filesystem. Keeps files if copy/move is partial.
*/
@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
Path source = new Path(srcUri);
Path target = new Path(dstUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testCopy()
hadoopFS.touch(new Path(baseURI.getPath(), "src/dir/2").toUri());
String[] srcFiles = hadoopFS.listFiles(new Path(baseURI.getPath(), "src").toUri(), true);
Assert.assertEquals(srcFiles.length, 3);
hadoopFS.copy(new Path(baseURI.getPath(), "src").toUri(), new Path(baseURI.getPath(), "dest").toUri());
hadoopFS.copyDir(new Path(baseURI.getPath(), "src").toUri(), new Path(baseURI.getPath(), "dest").toUri());
Assert.assertTrue(hadoopFS.exists(new Path(baseURI.getPath(), "dest").toUri()));
Assert.assertTrue(hadoopFS.exists(new Path(baseURI.getPath(), "dest/dir").toUri()));
Assert.assertTrue(hadoopFS.exists(new Path(baseURI.getPath(), "dest/dir/1").toUri()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
LOGGER.info("Copying uri {} to uri {}", srcUri, dstUri);
Preconditions.checkState(exists(srcUri), "Source URI '%s' does not exist", srcUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,11 @@ public boolean move(URI srcUri, URI dstUri, boolean overwrite)
}
return doMove(srcUri, dstUri);
}

/**
* Actual move implementation for each PinotFS. It should not be directly called, instead use
* {@link PinotFS#move(URI, URI, boolean)}.
*/
protected abstract boolean doMove(URI srcUri, URI dstUri)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
copy(toFile(srcUri), toFile(dstUri), false);
copy(toFile(srcUri), toFile(dstUri), true);
return true;
}

Expand Down Expand Up @@ -182,7 +182,7 @@ private static void copy(File srcFile, File dstFile, boolean recursive)
FileUtils.copyDirectory(srcFile, dstFile);
} else {
// Throws Exception on failure
throw new IOException(srcFile.getAbsolutePath() + " is a directory");
throw new IOException(srcFile.getAbsolutePath() + " is a directory and recursive copy is not enabled.");
}
} else {
// Will create parent directories, throws Exception on failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,25 @@ boolean move(URI srcUri, URI dstUri, boolean overwrite)
throws IOException;

/**
* Does the actual behavior of move in each FS.
* Copies the file from the src to dst. The original file is retained. If the dst has parent directories
* that haven't been created, this method will create all the necessary parent directories. If dst already exists,
* this will overwrite the existing file in the path.
*
* Note: In Pinot we recommend the full paths of both src and dst be specified.
* For example, if a file /a/b/c is copied to a file /x/y/z, the directory /a/b still exists containing the file 'c'.
* The dst file /x/y/z will contain the contents of 'c'.
* @param srcUri URI of the original file
* @param dstUri URI of the final file location
* @return true if copy is successful
* @throws IOException on IO failure
*/
boolean doMove(URI srcUri, URI dstUri)
throws IOException;
default boolean copy(URI srcUri, URI dstUri)
throws IOException {
if (isDirectory(srcUri)) {
throw new UnsupportedOperationException("Recursive copy not supported");
}
return copyDir(srcUri, dstUri);
}

/**
* Copies the file or directory from the src to dst. The original file is retained. If the dst has parent directories
Expand All @@ -108,8 +123,10 @@ boolean doMove(URI srcUri, URI dstUri)
* @return true if copy is successful
* @throws IOException on IO failure
*/
boolean copy(URI srcUri, URI dstUri)
throws IOException;
default boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
throw new UnsupportedOperationException("Recursive copy not supported");
}

/**
* Checks whether the file or directory at the provided location exists.
Expand Down Expand Up @@ -162,6 +179,7 @@ default void copyFromLocalDir(File srcFile, URI dstUri)
throws Exception {
throw new UnsupportedOperationException("Recursive copy not supported");
}

/**
* The src file is on the local disk. Add it to filesystem at the given dst name and the source is kept intact
* afterwards.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,47 @@ public void testFS()
// Expected.
}

// Check that directory only copy worked
try {
localPinotFS.copyDir(firstTempDir.toURI(), secondTempDir.toURI());
fail();
} catch (IOException e) {
// expected.
}
localPinotFS.copyDir(firstTempDir.toURI(), secondTempDir.toURI());
Assert.assertTrue(localPinotFS.exists(secondTempDir.toURI()));

// Copying directory with files to directory with files
File testFile = new File(firstTempDir, "testFile");
Assert.assertTrue(testFile.createNewFile(), "Could not create file " + testFile.getPath());
File newTestFile = new File(secondTempDir, "newTestFile");
Assert.assertTrue(newTestFile.createNewFile(), "Could not create file " + newTestFile.getPath());

localPinotFS.copyDir(firstTempDir.toURI(), secondTempDir.toURI());
Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), false).length, 1);

// Copying directory with files under another directory.
File firstTempDirUnderSecondTempDir = new File(secondTempDir, firstTempDir.getName());
localPinotFS.copyDir(firstTempDir.toURI(), firstTempDirUnderSecondTempDir.toURI());
Assert.assertTrue(localPinotFS.exists(firstTempDirUnderSecondTempDir.toURI()));
// There're two files/directories under secondTempDir.
Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), false).length, 2);
// The file under src directory also got copied under dst directory.
Assert.assertEquals(localPinotFS.listFiles(firstTempDirUnderSecondTempDir.toURI(), true).length, 1);

// len of dir = exception
try {
localPinotFS.length(firstTempDir.toURI());
fail();
} catch (IllegalArgumentException e) {
// expected.
}

Assert.assertTrue(testFile.exists());

localPinotFS.copyFromLocalFile(testFile, secondTestFileUri);
Assert.assertTrue(localPinotFS.exists(secondTestFileUri));
localPinotFS.copyToLocalFile(testFile.toURI(), new File(secondTestFileUri));
Assert.assertTrue(localPinotFS.exists(secondTestFileUri));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public boolean doMove(URI srcUri, URI dstUri)
}

@Override
public boolean copy(URI srcUri, URI dstUri)
public boolean copyDir(URI srcUri, URI dstUri)
throws IOException {
return false;
}
Expand Down

0 comments on commit b20d06d

Please sign in to comment.