Skip to content

Commit

Permalink
ISSUE-9555 fix GcsPinotFS (#9556)
Browse files Browse the repository at this point in the history
* ISSUE-9555 fix GcsPinotFS

* fix test

* add new line
  • Loading branch information
lfernandez93 authored Oct 18, 2022
1 parent d580bbc commit 4439197
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public final boolean mkdir(URI uri)
if (directoryPath.equals(GcsUri.DELIMITER)) {
return true;
}
if (existsDirectory(gcsUri)) {
if (existsDirectoryOrBucket(gcsUri)) {
return true;
}
Blob blob = getBucket(gcsUri).create(directoryPath, new byte[0]);
Expand Down Expand Up @@ -231,7 +231,7 @@ public void copyFromLocalFile(File srcFile, URI dstUri)
@Override
public boolean isDirectory(URI uri)
throws IOException {
return existsDirectory(new GcsUri(uri));
return existsDirectoryOrBucket(new GcsUri(uri));
}

@Override
Expand Down Expand Up @@ -300,10 +300,10 @@ private boolean isPathTerminatedByDelimiter(GcsUri gcsUri) {
* @return true if the directory exists
* @throws IOException
*/
private boolean existsDirectory(GcsUri gcsUri)
private boolean existsDirectoryOrBucket(GcsUri gcsUri)
throws IOException {
String prefix = gcsUri.getPrefix();
if (prefix.equals(GcsUri.DELIMITER)) {
if (prefix.isEmpty()) {
return true;
}
Blob blob = getBucket(gcsUri).get(prefix);
Expand All @@ -322,7 +322,7 @@ private boolean existsDirectory(GcsUri gcsUri)

private boolean isEmptyDirectory(GcsUri gcsUri)
throws IOException {
if (!existsDirectory(gcsUri)) {
if (!existsDirectoryOrBucket(gcsUri)) {
return false;
}
String prefix = gcsUri.getPrefix();
Expand Down Expand Up @@ -363,7 +363,7 @@ private void visitFiles(GcsUri fileUri, boolean recursive, Consumer<Blob> visito

private boolean exists(GcsUri gcsUri)
throws IOException {
if (existsDirectory(gcsUri)) {
if (existsDirectoryOrBucket(gcsUri)) {
return true;
}
if (isPathTerminatedByDelimiter(gcsUri)) {
Expand All @@ -378,7 +378,7 @@ private boolean delete(GcsUri segmentUri, boolean forceDelete)
if (!exists(segmentUri)) {
return forceDelete;
}
if (existsDirectory(segmentUri)) {
if (existsDirectoryOrBucket(segmentUri)) {
if (!forceDelete && !isEmptyDirectory(segmentUri)) {
return false;
}
Expand Down Expand Up @@ -441,7 +441,7 @@ private boolean copy(GcsUri srcUri, GcsUri dstUri)
return true;
}
// copy directly if source is a single file.
if (!existsDirectory(srcUri)) {
if (!existsDirectoryOrBucket(srcUri)) {
return copyFile(srcUri, dstUri);
}
// copy directory
Expand All @@ -456,7 +456,7 @@ private boolean copy(GcsUri srcUri, GcsUri dstUri)
*
* @see https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork
*/
if (!existsDirectory(dstUri)) {
if (!existsDirectoryOrBucket(dstUri)) {
mkdir(dstUri.getUri());
}
boolean copySucceeded = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static java.lang.String.format;
Expand Down Expand Up @@ -75,26 +77,44 @@
public class GcsPinotFSTest {
private static final String DATA_DIR_PREFIX = "testing-data";

private String _keyFile;
private String _projectId;
private String _bucket;
private GcsPinotFS _pinotFS;
private GcsUri _dataDir;
private Path _localTmpDir;
private final Closer _closer = Closer.create();

@BeforeClass
public void setup() {
String keyFile = System.getenv("GOOGLE_APPLICATION_CREDENTIALS");
String projectId = System.getenv("GCP_PROJECT");
String bucket = System.getenv("GCS_BUCKET");
if (keyFile != null && projectId != null && bucket != null) {
_keyFile = System.getenv("GOOGLE_APPLICATION_CREDENTIALS");
_projectId = System.getenv("GCP_PROJECT");
_bucket = System.getenv("GCS_BUCKET");
if (_keyFile != null && _projectId != null && _bucket != null) {
_pinotFS = new GcsPinotFS();
_pinotFS.init(new PinotConfiguration(
ImmutableMap.<String, Object>builder().put(PROJECT_ID, projectId).put(GCP_KEY, keyFile).build()));
_dataDir = createGcsUri(bucket, DATA_DIR_PREFIX + randomUUID());
ImmutableMap.<String, Object>builder().put(PROJECT_ID, _projectId).put(GCP_KEY, _keyFile).build()));
}
}

@AfterClass
public void tearDown()
throws Exception {
_closer.close();
}

@BeforeMethod
public void beforeTest()
throws Exception {
if (_pinotFS != null) {
_dataDir = createGcsUri(_bucket, DATA_DIR_PREFIX + randomUUID());
_localTmpDir = createLocalTempDirectory();
}
}

@AfterMethod
public void afterTest()
throws Exception {
if (_pinotFS != null) {
_pinotFS.delete(_dataDir.getUri(), true);
_closer.close();
Expand Down Expand Up @@ -157,12 +177,11 @@ public void testGcs()
throws Exception {
skipIfNotConfigured();
// Create empty file
Path localTmpDir = createLocalTempDirectory();
Path emptyFile = localTmpDir.resolve("empty");
Path emptyFile = _localTmpDir.resolve("empty");
emptyFile.toFile().createNewFile();

// Create non-empty file
Path file1 = localTmpDir.resolve("file1");
Path file1 = _localTmpDir.resolve("file1");
List<String> expectedLinesFromFile = writeToFile(file1, 10);
List<String> actualLinesFromFile = Files.readAllLines(file1, UTF_8);
// Sanity check
Expand Down Expand Up @@ -203,7 +222,7 @@ public void testGcs()
assertTrue(
listFilesToStream(createGcsUri(_dataDir.getBucketName(), "")).collect(toSet()).containsAll(expectedElements));
// Check that the non-empty file has the expected contents
Path nonEmptyFileFromGcs = localTmpDir.resolve("nonEmptyFileFromGcs");
Path nonEmptyFileFromGcs = _localTmpDir.resolve("nonEmptyFileFromGcs");
_pinotFS.copyToLocalFile(nonEmptyFileGcsUri.getUri(), nonEmptyFileFromGcs.toFile());
assertEquals(Files.readAllLines(nonEmptyFileFromGcs), expectedLinesFromFile);

Expand All @@ -226,8 +245,8 @@ public void testGcs()
String directoryName = Paths.get(gcsDirectoryUri.getPath()).getFileName().toString();
String directoryCopyName = Paths.get(gcsDirectoryUriCopy.getPath()).getFileName().toString();
for (GcsUri element : ImmutableList.copyOf(expectedElements)) {
expectedElementsCopy
.add(createGcsUri(element.getBucketName(), element.getPath().replace(directoryName, directoryCopyName)));
expectedElementsCopy.add(
createGcsUri(element.getBucketName(), element.getPath().replace(directoryName, directoryCopyName)));
}
expectedElementsCopy.addAll(expectedElements);
assertEquals(listFilesToStream(_dataDir).collect(toSet()), expectedElementsCopy);
Expand All @@ -253,8 +272,7 @@ public void testListFilesWithMetadata()
skipIfNotConfigured();

// Create empty file
Path localTmpDir = createLocalTempDirectory();
Path emptyFile = localTmpDir.resolve("empty");
Path emptyFile = _localTmpDir.resolve("empty");
emptyFile.toFile().createNewFile();

// Create 5 subfolders with files inside.
Expand Down Expand Up @@ -295,9 +313,8 @@ public void testListFilesWithMetadata()
Assert.assertEquals(fileMetadata.size(), count + 2);
Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
Assert.assertEquals(fileMetadata.stream().filter(f -> !f.isDirectory()).count(), 1);
Assert.assertTrue(expectedNonRecursive
.containsAll(fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())),
fileMetadata.toString());
Assert.assertTrue(expectedNonRecursive.containsAll(
fileMetadata.stream().map(FileMetadata::getFilePath).collect(Collectors.toSet())), fileMetadata.toString());
fileMetadata = _pinotFS.listFilesWithMetadata(_dataDir.getUri(), true);
Assert.assertEquals(fileMetadata.size(), count * 2 + 2);
Assert.assertEquals(fileMetadata.stream().filter(FileMetadata::isDirectory).count(), count + 1);
Expand Down

0 comments on commit 4439197

Please sign in to comment.