Skip to content

Commit

Permalink
[NOID] Fixes #4207: Integration Tests for Load Procedures with Cloud …
Browse files Browse the repository at this point in the history
…Object Storage (#4226)

* Fixes #4207: Integration Tests for Load Procedures with Cloud Object Storage

* added google cloud tests

* wip - adding other procs

* cleanup

* added jsonParams proc

* fix tests

* fix extended-it tests

* cleanup

* fix NoFileFound errors

* fixed tests due to missing apoc config

* added ignored export tests
  • Loading branch information
vga91 committed Jan 15, 2025
1 parent 6228fae commit dd63ee3
Show file tree
Hide file tree
Showing 42 changed files with 2,620 additions and 295 deletions.
143 changes: 18 additions & 125 deletions core/src/test/java/apoc/export/arrow/ArrowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,17 @@
import apoc.util.JsonUtil;
import apoc.util.TestUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.File;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.neo4j.configuration.GraphDatabaseSettings;
import org.neo4j.graphdb.Result;
import org.neo4j.test.rule.DbmsRule;
import org.neo4j.test.rule.ImpermanentDbmsRule;

import java.io.File;


public class ArrowTest {

private static File directory = new File("target/arrow import");
Expand All @@ -65,75 +57,11 @@ public class ArrowTest {
directory.toPath().toAbsolutePath())
.withSetting(ApocSettings.apoc_export_file_enabled, true);

public static final List<Map<String, Object>> EXPECTED = List.of(
new HashMap<>() {
{
put("name", "Adam");
put("bffSince", null);
put("<source.id>", null);
put("<id>", 0L);
put("age", 42L);
put("labels", List.of("User"));
put("male", true);
put("<type>", null);
put("kids", List.of("Sam", "Anna", "Grace"));
put(
"place",
Map.of("crs", "wgs-84-3d", "longitude", 33.46789D, "latitude", 13.1D, "height", 100.0D));
put("<target.id>", null);
put("since", null);
put(
"born",
LocalDateTime.parse("2015-05-18T19:32:24.000")
.atOffset(ZoneOffset.UTC)
.toZonedDateTime());
}
},
new HashMap<>() {
{
put("name", "Jim");
put("bffSince", null);
put("<source.id>", null);
put("<id>", 1L);
put("age", 42L);
put("labels", List.of("User"));
put("male", null);
put("<type>", null);
put("kids", null);
put("place", null);
put("<target.id>", null);
put("since", null);
put("born", null);
}
},
new HashMap<>() {
{
put("name", null);
put("bffSince", "P5M1DT12H");
put("<source.id>", 0L);
put("<id>", 0L);
put("age", null);
put("labels", null);
put("male", null);
put("<type>", "KNOWS");
put("kids", null);
put("place", null);
put("<target.id>", 1L);
put("since", 1993L);
put("born", null);
}
});


@BeforeClass
public static void beforeClass() {
db.executeTransactionally(
"CREATE (f:User {name:'Adam',age:42,male:true,kids:['Sam','Anna','Grace'], born:localdatetime('2015-05-18T19:32:24.000'), place:point({latitude: 13.1, longitude: 33.46789, height: 100.0})})-[:KNOWS {since: 1993, bffSince: duration('P5M1.5D')}]->(b:User {name:'Jim',age:42})");
TestUtil.registerProcedure(db, ExportArrow.class, LoadArrow.class, Graphs.class, Meta.class);
}

@AfterClass
public static void teardown() {
db.shutdown();
initDbCommon(db);
}

@Before
Expand All @@ -142,21 +70,13 @@ public void before() {
apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, true);
}

private byte[] extractByteArray(Result result) {
return result.<byte[]>columnAs("byteArray").next();
}

private String extractFileName(Result result) {
return result.<String>columnAs("file").next();
@AfterClass
public static void teardown() {
db.shutdown();
}

private <T> T readValue(String json, Class<T> clazz) {
if (json == null) return null;
try {
return JsonUtil.OBJECT_MAPPER.readValue(json, clazz);
} catch (JsonProcessingException e) {
return null;
}
private byte[] extractByteArray(Result result) {
return result.<byte[]>columnAs("byteArray").next();
}

@Test
Expand Down Expand Up @@ -215,7 +135,7 @@ public void testFileRoundtripArrowQuery() {
String file = db.executeTransactionally(
"CALL apoc.export.arrow.query('query_test.arrow', $query) YIELD file",
Map.of("query", returnQuery),
this::extractFileName);
ArrowTestUtil::extractFileName);

// then
final String query = "CALL apoc.load.arrow($file) YIELD value " + "RETURN value";
Expand Down Expand Up @@ -251,22 +171,7 @@ public void testStreamRoundtripArrowGraph() {

// then
final String query = "CALL apoc.load.arrow.stream($byteArray) YIELD value " + "RETURN value";
db.executeTransactionally(query, Map.of("byteArray", byteArray), result -> {
final List<Map<String, Object>> actual = getActual(result);
assertEquals(EXPECTED, actual);
return null;
});
}

private List<Map<String, Object>> getActual(Result result) {
return result.stream()
.map(m -> (Map<String, Object>) m.get("value"))
.map(m -> {
final Map<String, Object> newMap = new HashMap(m);
newMap.put("place", readValue((String) m.get("place"), Map.class));
return newMap;
})
.collect(Collectors.toList());
testLoadArrow(db, query, Map.of("byteArray", byteArray));
}

@Test
Expand All @@ -277,15 +182,11 @@ public void testFileRoundtripArrowGraph() {
+ "CALL apoc.export.arrow.graph('graph_test.arrow', graph) YIELD file "
+ "RETURN file",
Map.of(),
this::extractFileName);
ArrowTestUtil::extractFileName);

// then
final String query = "CALL apoc.load.arrow($file) YIELD value " + "RETURN value";
db.executeTransactionally(query, Map.of("file", file), result -> {
final List<Map<String, Object>> actual = getActual(result);
assertEquals(EXPECTED, actual);
return null;
});
testLoadArrow(db, query, Map.of("file", file));
}

@Test
Expand All @@ -310,26 +211,18 @@ private void testStreamRoundtripAllCommon() {

// then
final String query = "CALL apoc.load.arrow.stream($byteArray) YIELD value " + "RETURN value";
db.executeTransactionally(query, Map.of("byteArray", byteArray), result -> {
final List<Map<String, Object>> actual = getActual(result);
assertEquals(EXPECTED, actual);
return null;
});
testLoadArrow(db, query, Map.of("byteArray", byteArray));
}

@Test
public void testFileRoundtripArrowAll() {
// given - when
String file = db.executeTransactionally(
"CALL apoc.export.arrow.all('all_test.arrow') YIELD file", Map.of(), this::extractFileName);
"CALL apoc.export.arrow.all('all_test.arrow') YIELD file", Map.of(), ArrowTestUtil::extractFileName);

// then
final String query = "CALL apoc.load.arrow($file) YIELD value " + "RETURN value";
db.executeTransactionally(query, Map.of("file", file), result -> {
final List<Map<String, Object>> actual = getActual(result);
assertEquals(EXPECTED, actual);
return null;
});
testLoadArrow(db, query, Map.of("file", file));
}

@Test
Expand Down Expand Up @@ -365,7 +258,7 @@ public void testFileVolumeArrowAll() {
String file = db.executeTransactionally(
"CALL apoc.export.arrow.query('volume_test.arrow', 'MATCH (n:ArrowNode) RETURN n.id AS id') YIELD file ",
Map.of(),
this::extractFileName);
ArrowTestUtil::extractFileName);

final List<Long> expected = LongStream.range(0, 10000).mapToObj(l -> l).collect(Collectors.toList());

Expand Down
9 changes: 9 additions & 0 deletions core/src/test/java/apoc/util/s3/S3TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,14 @@ public static void assertS3KeyEventually(Runnable runnable) {
v -> v,
30L,
TimeUnit.SECONDS);
}

public static String removeRegionFromUrl(S3Container s3Container, String url) {
return url.replace(s3Container.getEndpointConfiguration().getSigningRegion() + ".", "");
}

public static String putToS3AndGetUrl(S3Container s3Container, String filename) {
String url = s3Container.putFile(filename);
return removeRegionFromUrl(s3Container, url);
}
}
62 changes: 62 additions & 0 deletions extended-it/src/test/java/apoc/azure/ArrowAzureStorageTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package apoc.azure;

import apoc.export.arrow.ArrowTestUtil;
import apoc.util.s3.S3BaseTest;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.configuration.GraphDatabaseInternalSettings;
import org.neo4j.test.rule.DbmsRule;
import org.neo4j.test.rule.ImpermanentDbmsRule;

import java.util.Map;

import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED;
import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED;
import static apoc.ApocConfig.apocConfig;
import static apoc.export.arrow.ArrowTestUtil.initDbCommon;
import static apoc.export.arrow.ArrowTestUtil.testImportCommon;
import static apoc.export.arrow.ArrowTestUtil.testLoadArrow;

@Ignore("This test won't work until the Azure Storage files will be correctly handled via FileUtils, placed in APOC Core")
public class ArrowAzureStorageTest extends AzureStorageBaseTest {

@Rule
public DbmsRule db = new ImpermanentDbmsRule()
.withSetting(GraphDatabaseInternalSettings.enable_experimental_cypher_versions, true);

@Before
public void beforeClass() {
initDbCommon(db);
apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true);
apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, true);
}

@Test
public void testFileRoundtripWithLoadArrow() {
String url = putToAzureStorageAndGetUrl("test_all.arrow");

String file = db.executeTransactionally("CALL apoc.export.arrow.all($url) YIELD file",
Map.of("url", url),
ArrowTestUtil::extractFileName);

// check that the exported file is correct
final String query = "CALL apoc.load.arrow($file, {})";
testLoadArrow(db, query, Map.of("file", file));
}


@Test
public void testFileRoundtripWithImportArrow() {
db.executeTransactionally("CREATE (:Another {foo:1, listInt: [1,2]}), (:Another {bar:'Sam'})");

String url = putToAzureStorageAndGetUrl("test_all_import.arrow");
String file = db.executeTransactionally("CALL apoc.export.arrow.all($url) YIELD file",
Map.of("url", url),
ArrowTestUtil::extractFileName);

// check that the exported file is correct
testImportCommon(db, file, ArrowTestUtil.MAPPING_ALL);
}
}
74 changes: 74 additions & 0 deletions extended-it/src/test/java/apoc/azure/AzureStorageBaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package apoc.azure;

import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.blob.sas.BlobSasPermission;
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.UUID;

public class AzureStorageBaseTest {

public static GenericContainer<?> azuriteContainer;
public static BlobContainerClient containerClient;

@BeforeClass
public static void setUp() throws Exception {
DockerImageName azuriteImg = DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite");
azuriteContainer = new GenericContainer<>(azuriteImg)
.withExposedPorts(10000);

azuriteContainer.start();

var accountName = "devstoreaccount1";
var accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
var blobEndpoint = "http://%s:%d/%s".formatted(azuriteContainer.getHost(), azuriteContainer.getMappedPort(10000), accountName);
var connectionString = "DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s;BlobEndpoint=%s;"
.formatted(accountName, accountKey, blobEndpoint);

containerClient = new BlobContainerClientBuilder()
.connectionString(connectionString)
.containerName("test-container")
.buildClient();
containerClient.create();
}

@AfterClass
public static void teardown() {
azuriteContainer.close();
}

public static String putToAzureStorageAndGetUrl(String url) {
try {
File file = new File(url);
byte[] content = FileUtils.readFileToByteArray(file);

var blobClient = getBlobClient(content);
BlobSasPermission permission = new BlobSasPermission().setReadPermission(true);
OffsetDateTime expiryTime = OffsetDateTime.now().plusHours(1);
String sasToken = blobClient.generateSas(new BlobServiceSasSignatureValues(expiryTime, permission), new Context("Azure-Storage-Log-String-To-Sign", "true"));
return blobClient.getBlobUrl() + "?" + sasToken;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public static BlobClient getBlobClient(byte[] content) {
var blobName = "blob-" + UUID.randomUUID();
var blobClient = containerClient.getBlobClient(blobName);
blobClient.upload(new ByteArrayInputStream(content));
return blobClient;
}

}
Loading

0 comments on commit dd63ee3

Please sign in to comment.