From 605f7f4fc794ce426a2851b2c7ee962248ac2a4f Mon Sep 17 00:00:00 2001 From: Giuseppe Villani Date: Mon, 16 Dec 2024 12:50:51 +0100 Subject: [PATCH 1/4] [NOID] Fixes #4244: Make apoc.dv.* procedures work in clusters (#4281) * Fixes #4244: Make apoc.dv.* procedures work in clusters * added procs to extended*.txt * fix tests * cleanup --- .../overview/apoc.dv/apoc.dv.catalog.add.adoc | 4 +- .../apoc.dv/apoc.dv.catalog.list.adoc | 4 +- .../apoc.dv/apoc.dv.catalog.remove.adoc | 4 +- .../ROOT/pages/virtual-resource/index.adoc | 17 +- .../modules/ROOT/partials/dv/deprecated.adoc | 19 ++ ...VirtualizationCatalogNewProcedureTest.java | 221 +++++++++++++ .../it/dv/DataVirtualizationCatalogTest.java | 198 ++++++++++++ .../dv/DataVirtualizationCatalogTestUtil.java | 297 ++++++++++++++++++ .../apoc/dv/DataVirtualizationCatalog.java | 21 +- ...ualizationCatalogHandlerNewProcedures.java | 60 ++++ ...ataVirtualizationCatalogNewProcedures.java | 74 +++++ full/src/main/resources/extended.txt | 3 + 12 files changed, 907 insertions(+), 15 deletions(-) create mode 100644 docs/asciidoc/modules/ROOT/partials/dv/deprecated.adoc create mode 100644 full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogNewProcedureTest.java create mode 100644 full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogTest.java create mode 100644 full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogTestUtil.java create mode 100644 full/src/main/java/apoc/dv/DataVirtualizationCatalogHandlerNewProcedures.java create mode 100644 full/src/main/java/apoc/dv/DataVirtualizationCatalogNewProcedures.java diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.add.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.add.adoc index ede714a2c9..e393700d58 100644 --- a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.add.adoc +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.add.adoc @@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it! = apoc.dv.catalog.add :description: This section contains reference documentation for the apoc.dv.catalog.add procedure. -label:procedure[] label:apoc-full[] +label:procedure[] label:apoc-full[] label:deprecated[] [.emphasis] Add a virtualized resource configuration @@ -17,6 +17,8 @@ Add a virtualized resource configuration apoc.dv.catalog.add(name :: STRING?, config = {} :: MAP?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?) ---- +include::partial$/dv/deprecated.adoc[] + [WARNING] ==== This procedure is not intended to be used in a cluster environment, and may act unpredictably. diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.list.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.list.adoc index 3fffd24690..a1f1ae7bc6 100644 --- a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.list.adoc +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.list.adoc @@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it! = apoc.dv.catalog.list :description: This section contains reference documentation for the apoc.dv.catalog.list procedure. -label:procedure[] label:apoc-full[] +label:procedure[] label:apoc-full[] label:deprecated[] [.emphasis] List all virtualized resource configuration @@ -17,6 +17,8 @@ List all virtualized resource configuration apoc.dv.catalog.list() :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?) ---- +include::partial$/dv/deprecated.adoc[] + == Output parameters [.procedures, opts=header] |=== diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.remove.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.remove.adoc index 2b0165ecb4..261d80428d 100644 --- a/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.remove.adoc +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.dv/apoc.dv.catalog.remove.adoc @@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it! = apoc.dv.catalog.remove :description: This section contains reference documentation for the apoc.dv.catalog.remove procedure. -label:procedure[] label:apoc-full[] +label:procedure[] label:apoc-full[] label:deprecated[] [.emphasis] Remove a virtualized resource config by name @@ -17,6 +17,8 @@ Remove a virtualized resource config by name apoc.dv.catalog.remove(name :: STRING?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?) ---- +include::partial$/dv/deprecated.adoc[] + [WARNING] ==== This procedure is not intended to be used in a cluster environment, and may act unpredictably. diff --git a/docs/asciidoc/modules/ROOT/pages/virtual-resource/index.adoc b/docs/asciidoc/modules/ROOT/pages/virtual-resource/index.adoc index 57db73eb2b..0b9b7384b0 100644 --- a/docs/asciidoc/modules/ROOT/pages/virtual-resource/index.adoc +++ b/docs/asciidoc/modules/ROOT/pages/virtual-resource/index.adoc @@ -2,6 +2,8 @@ = Virtual Resource :description: This chapter describes how to handle external data sources as virtual resource without persisting them in the database +include::partial$systemdbonly.note.adoc[] + [NOTE] ==== There are situations where we would like to enrich/complement the results of a cypher query in a Neo4j graph with additional @@ -40,10 +42,11 @@ image::apoc.dv.imported-graph-from-RDB.png[scaledwidth="100%"] == Managing a Virtualized Resource via JDBC === Creating a Virtualized Resource (JDBC) -Before we can query a Virtualized Resource, we need to define it. We do this using the `apoc.dv.catalog.add` procedure. -The procedure takes two parameters: +Before we can query a Virtualized Resource, we need to define it. We do this using the `apoc.dv.catalog.install` procedure. +The procedure takes three parameters: * a name that uniquely identifies the virtualized resource and can be used to query that resource +* the database name where we want to use the resource (default is `'neo4j'`) * a set of parameters indicating the type of the resource (type), the access point (url), the parameterised query that will be run on the access point (query) and the labels that will be applied to the generated virtual nodes (labels). @@ -56,7 +59,7 @@ Here is the cypher that creates such virtualized resource: [source,cypher] ---- -CALL apoc.dv.catalog.add("fr-towns-by-dept", { +CALL apoc.dv.catalog.install("fr-towns-by-dept", "neo4j", { type: "JDBC", url: "jdbc:postgresql://localhost/communes?user=jb&password=jb", labels: ["Town","PopulatedPlace"], @@ -124,11 +127,11 @@ RETURN path ---- === Listing the Virtualized Resource Catalog -The apoc.dv.catalog.list procedure returns a list with all the existing Virtualized resources and their descriptions. It takes no parameters. +The apoc.dv.catalog.list procedure returns a list with all the existing Virtualized resources and their descriptions. It accepts one parameter: i.e. the database name where we want to use the resource (default is 'neo4j'). [source,cypher] ---- -CALL apoc.dv.catalog.list() +CALL apoc.dv.catalog.show() ---- === Removing Virtualized Resources from the Catalog @@ -136,7 +139,7 @@ When a Virtualized Resource is no longer needed it can be removed from the catal [source,cypher] ---- -CALL apoc.dv.catalog.remove("vr-name") +CALL apoc.dv.catalog.drop("vr-name", ) ---- === Export metadata @@ -165,7 +168,7 @@ Here is the cypher that creates such virtualized resource: [source,cypher] ---- -CALL apoc.dv.catalog.add("prod-details-by-id", { +CALL apoc.dv.catalog.install("prod-details-by-id", "neo4j", { type: "CSV", url: "http://data.neo4j.com/northwind/products.csv", labels: ["ProductDetails"], diff --git a/docs/asciidoc/modules/ROOT/partials/dv/deprecated.adoc b/docs/asciidoc/modules/ROOT/partials/dv/deprecated.adoc new file mode 100644 index 0000000000..be2f21d8c6 --- /dev/null +++ b/docs/asciidoc/modules/ROOT/partials/dv/deprecated.adoc @@ -0,0 +1,19 @@ +[WARNING] +==== +Please note that this procedure is deprecated. + +Use the following ones instead, which allow for better support in a cluster: + +[opts="header"] +|=== +| deprecated procedure | new procedure +| `apoc.dv.catalog.add(, $config)` | `apoc.dv.catalog.install('', '', $config)` +| `apoc.dv.catalog.remove('')` | `apoc.dv.catalog.drop('', '')` +| `apoc.dv.catalog.list()` | `apoc.dv.catalog.show('')` +|=== + +where `` is the database where we want to execute the procedure + +xref::virtual-resource/index.adoc[See here for more info]. + +==== \ No newline at end of file diff --git a/full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogNewProcedureTest.java b/full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogNewProcedureTest.java new file mode 100644 index 0000000000..34b09b24b6 --- /dev/null +++ b/full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogNewProcedureTest.java @@ -0,0 +1,221 @@ +package apoc.full.it.dv; + +import apoc.create.Create; +import apoc.dv.DataVirtualizationCatalog; +import apoc.dv.DataVirtualizationCatalogNewProcedures; +import apoc.load.Jdbc; +import apoc.load.LoadCsv; +import apoc.util.TestUtil; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.dbms.api.DatabaseManagementService; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Result; +import org.neo4j.test.TestDatabaseManagementServiceBuilder; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.MySQLContainer; + +import java.io.File; +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.apocConfig; + +import static apoc.full.it.dv.DataVirtualizationCatalogTestUtil.*; +import static apoc.util.MapUtil.map; +import static apoc.util.TestUtil.testCall; +import static apoc.util.TestUtil.testCallCount; + +import static apoc.util.TestUtil.testResult; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class DataVirtualizationCatalogNewProcedureTest { + private static final String DATABASE_NAME = "databaseName"; + private static GraphDatabaseService sysDb; + private static GraphDatabaseService db; + private static DatabaseManagementService databaseManagementService; + + public static JdbcDatabaseContainer mysql; + + @Rule + public TemporaryFolder storeDir = new TemporaryFolder(); + + @Before + public void setUp() throws Exception { + databaseManagementService = new TestDatabaseManagementServiceBuilder(storeDir.getRoot().toPath()) + .build(); + db = databaseManagementService.database(GraphDatabaseSettings.DEFAULT_DATABASE_NAME); + sysDb = databaseManagementService.database(GraphDatabaseSettings.SYSTEM_DATABASE_NAME); + + FileUtils.copyFile(new File(new URI(FILE_URL).toURL().getPath()), new File(storeDir.getRoot(), CSV_TEST_FILE)); + + TestUtil.registerProcedure(sysDb, DataVirtualizationCatalogNewProcedures.class); + TestUtil.registerProcedure(db, DataVirtualizationCatalog.class, Jdbc.class, LoadCsv.class, Create.class); + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); + } + + @BeforeClass + public static void setUpContainer() { + mysql = new MySQLContainer().withInitScript("init_mysql.sql"); + mysql.start(); + } + + @AfterClass + public static void tearDownContainer() { + mysql.stop(); + } + + @Test + public void testVirtualizeCSV() { + getVirtualizeCSVCommonResult(db, + APOC_DV_INSTALL_QUERY, APOC_DV_SHOW_QUERY, CSV_TEST_FILE, sysDb); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, CSV_NAME_VALUE, APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, RELTYPE_KEY, RELTYPE_VALUE, CONFIG_KEY, CONFIG_VALUE), + DataVirtualizationCatalogTestUtil::assertVirtualizeCSVQueryAndLinkContent); + } + + @Test + public void testVirtualizeCSVWithCustomDirectionIN() { + getVirtualizeCSVCommonResult(db, + APOC_DV_INSTALL_QUERY, APOC_DV_SHOW_QUERY, CSV_TEST_FILE, sysDb); + + Map config = new HashMap<>(CONFIG_VALUE); + config.put(DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name()); + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, CSV_NAME_VALUE, APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, RELTYPE_KEY, RELTYPE_VALUE, CONFIG_KEY, config), + DataVirtualizationCatalogTestUtil::assertVirtualizeCSVQueryAndLinkContentDirectionIN); + } + + @Test + public void testVirtualizeJDBC() { + + getVirtualizeJDBCCommonResult(db, mysql, + APOC_DV_INSTALL_QUERY, sysDb); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_APOC_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map(CREDENTIALS_KEY, getJdbcCredentials(mysql))), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContent); + } + + @Test + public void testVirtualizeJDBCWithCustomDirectionIN() { + + getVirtualizeJDBCCommonResult(db, mysql, + APOC_DV_INSTALL_QUERY, sysDb); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_APOC_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map( + DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name(), + CREDENTIALS_KEY, getJdbcCredentials(mysql) + )), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContentDirectionIN); + } + + @Test + public void testVirtualizeJDBCWithParameterMap() { + + getVirtualizeJDBCWithParamsCommonResult(db, mysql, APOC_DV_INSTALL_QUERY, sysDb); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_QUERY_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map( + CREDENTIALS_KEY, getJdbcCredentials(mysql) + )), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContent); + } + + @Test + public void testVirtualizeJDBCWithParameterMapAndDirectionIN() { + + getVirtualizeJDBCWithParamsCommonResult(db, mysql, APOC_DV_INSTALL_QUERY, sysDb); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_QUERY_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map( + DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name(), + CREDENTIALS_KEY, getJdbcCredentials(mysql) + )), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContentDirectionIN); + } + + @Test + public void testRemove() { + sysDb.executeTransactionally(APOC_DV_INSTALL_QUERY, + map(DATABASE_NAME, GraphDatabaseSettings.DEFAULT_DATABASE_NAME,NAME_KEY, JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY))); + + testCallCount(sysDb, APOC_DV_DROP_QUERY, map(DATABASE_NAME, GraphDatabaseSettings.DEFAULT_DATABASE_NAME,NAME_KEY, JDBC_NAME), 0); + } + + @Test + public void testNameAsKey() { + Map params = map( + DATABASE_NAME, GraphDatabaseSettings.DEFAULT_DATABASE_NAME, + NAME_KEY, JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY) + ); + + sysDb.executeTransactionally(APOC_DV_INSTALL_QUERY, params); + sysDb.executeTransactionally(APOC_DV_INSTALL_QUERY, params); + testResult(sysDb, APOC_DV_SHOW_QUERY, + (result) -> assertEquals(1, result.stream().count())); + } + + @Test + public void testJDBCQueryWithMixedParamsTypes() { + try { + sysDb.executeTransactionally(APOC_DV_INSTALL_QUERY, + map( + DATABASE_NAME, GraphDatabaseSettings.DEFAULT_DATABASE_NAME,NAME_KEY, JDBC_NAME, + "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY_WITH_PARAM) + ) + ); + Assert.fail("Exception is expected"); + } catch (Exception e) { + final Throwable rootCause = ExceptionUtils.getRootCause(e); + assertTrue(rootCause instanceof IllegalArgumentException); + assertEquals("The query is mixing parameters with `$` and `?` please use just one notation", rootCause.getMessage()); + } + } + + @Test + public void testVirtualizeJDBCWithDifferentParameterMap() { + final String url = getVirtualizeJDBCUrl(mysql); + final List expectedParams = List.of("$name", "$head_of_state", "$CODE2"); + final List sortedExpectedParams = expectedParams.stream() + .sorted() + .toList(); + testCall(sysDb, APOC_DV_INSTALL_QUERY, + map(DATABASE_NAME, GraphDatabaseSettings.DEFAULT_DATABASE_NAME,NAME_KEY, JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, VIRTUALIZE_JDBC_WITH_PARAMS_QUERY)), + (row) -> assertDvCatalogAddOrInstall(row, url)); + + try { + db.executeTransactionally(APOC_DV_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_QUERY_WRONG_PARAMS, + CONFIG_KEY, map(CREDENTIALS_KEY, getJdbcCredentials(mysql))), + Result::resultAsString); + Assert.fail("Exception is expected"); + } catch (Exception e) { + final Throwable rootCause = ExceptionUtils.getRootCause(e); + assertTrue(rootCause instanceof IllegalArgumentException); + final List actualParams = VIRTUALIZE_JDBC_QUERY_WRONG_PARAMS.keySet().stream() + .map(s -> "$" + s) + .sorted() + .toList(); + assertEquals(String.format("Expected query parameters are %s, actual are %s", sortedExpectedParams, actualParams), rootCause.getMessage()); + } + } +} diff --git a/full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogTest.java b/full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogTest.java new file mode 100644 index 0000000000..242282379b --- /dev/null +++ b/full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogTest.java @@ -0,0 +1,198 @@ +package apoc.full.it.dv; + +import apoc.create.Create; +import apoc.dv.DataVirtualizationCatalog; +import apoc.dv.DataVirtualizationCatalogTestUtil; +import apoc.load.Jdbc; +import apoc.load.LoadCsv; +import apoc.util.TestUtil; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.neo4j.graphdb.Result; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; + +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.MySQLContainer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.apocConfig; +import static apoc.dv.DataVirtualizationCatalog.DIRECTION_CONF_KEY; +import static apoc.dv.DataVirtualizationCatalogTestUtil.*; +import static apoc.util.MapUtil.map; +import static apoc.util.TestUtil.getUrlFileName; +import static apoc.util.TestUtil.testCall; +import static apoc.util.TestUtil.testCallEmpty; +import static apoc.util.TestUtil.testResult; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class DataVirtualizationCatalogTest { + + public static JdbcDatabaseContainer mysql; + + @Rule + public DbmsRule db = new ImpermanentDbmsRule(); + + @Before + public void setUp() throws Exception { + TestUtil.registerProcedure(db, DataVirtualizationCatalog.class, Jdbc.class, LoadCsv.class, Create.class); + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); + } + + @BeforeClass + public static void setUpContainer() { + mysql = new MySQLContainer().withInitScript("init_mysql.sql"); + mysql.start(); + } + + @AfterClass + public static void tearDownContainer() { + mysql.stop(); + } + + @Test + public void testVirtualizeCSV() { + final String url = getUrlFileName("test.csv").toString(); + getVirtualizeCSVCommonResult(db, APOC_DV_ADD_QUERY, APOC_DV_LIST, url, db); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, CSV_NAME_VALUE, APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, RELTYPE_KEY, RELTYPE_VALUE, CONFIG_KEY, CONFIG_VALUE), + DataVirtualizationCatalogTestUtil::assertVirtualizeCSVQueryAndLinkContent); + + } + + @Test + public void testVirtualizeCSVWithCustomDirectionIN() { + final String url = getUrlFileName("test.csv").toString(); + getVirtualizeCSVCommonResult(db, APOC_DV_ADD_QUERY, APOC_DV_LIST, url, db); + + Map config = new HashMap<>(CONFIG_VALUE); + config.put(DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name()); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, CSV_NAME_VALUE, APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, RELTYPE_KEY, RELTYPE_VALUE, CONFIG_KEY, config), + DataVirtualizationCatalogTestUtil::assertVirtualizeCSVQueryAndLinkContentDirectionIN); + } + + @Test + public void testVirtualizeJDBC() { + getVirtualizeJDBCCommonResult(db, mysql, APOC_DV_ADD_QUERY, db); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_APOC_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map(CREDENTIALS_KEY, getJdbcCredentials(mysql))), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContent); + } + + @Test + public void testVirtualizeJDBCWithCustomDirectionIN() { + getVirtualizeJDBCCommonResult(db, mysql, APOC_DV_ADD_QUERY, db); + + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_APOC_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map( + DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name(), + CREDENTIALS_KEY, getJdbcCredentials(mysql) + )), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContentDirectionIN); + } + + @Test + public void testVirtualizeJDBCWithParameterMap() { + getVirtualizeJDBCWithParamsCommonResult(db, mysql, APOC_DV_ADD_QUERY, db); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_QUERY_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map(CREDENTIALS_KEY, getJdbcCredentials(mysql)) + ), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContent); + } + + @Test + public void testVirtualizeJDBCWithParameterMapAndDirectionIN() { + getVirtualizeJDBCWithParamsCommonResult(db, mysql, APOC_DV_ADD_QUERY, db); + + testCall(db, APOC_DV_QUERY_AND_LINK_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, VIRTUALIZE_JDBC_QUERY_PARAMS, RELTYPE_KEY, VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE, + CONFIG_KEY, map( + DIRECTION_CONF_KEY, DataVirtualizationCatalog.Direction.IN.name(), + CREDENTIALS_KEY, getJdbcCredentials(mysql) + )), + DataVirtualizationCatalogTestUtil::assertVirtualizeJDBCQueryAndLinkContentDirectionIN); + } + + @Test + public void testRemove() { + db.executeTransactionally(APOC_DV_ADD_QUERY, + map("name", JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY))); + + testCallEmpty(db, "CALL apoc.dv.catalog.remove($name)", map("name", JDBC_NAME)); + } + + @Test + public void testNameAsKey() { + Map params = map( + NAME_KEY, JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY) + ); + + db.executeTransactionally(APOC_DV_ADD_QUERY, params); + db.executeTransactionally(APOC_DV_ADD_QUERY, params); + testResult(db, "CALL apoc.dv.catalog.list()", + map(), + (result) -> assertEquals(1, result.stream().count())); + } + + @Test + public void testJDBCQueryWithMixedParamsTypes() { + try { + db.executeTransactionally(APOC_DV_ADD_QUERY, + map("name", JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, JDBC_SELECT_QUERY_WITH_PARAM))); + Assert.fail("Exception is expected"); + } catch (Exception e) { + final Throwable rootCause = ExceptionUtils.getRootCause(e); + assertTrue(rootCause instanceof IllegalArgumentException); + assertEquals("The query is mixing parameters with `$` and `?` please use just one notation", rootCause.getMessage()); + } + } + + @Test + public void testVirtualizeJDBCWithDifferentParameterMap() { + final String url = mysql.getJdbcUrl() + "?useSSL=false"; + testCall(db, APOC_DV_ADD_QUERY, + map("name", JDBC_NAME, "map", getVirtualizeJDBCParameterMap(mysql, VIRTUALIZE_JDBC_WITH_PARAMS_QUERY)), + (row) -> assertDvCatalogAddOrInstall(row, url)); + + String country = "Netherlands"; + String code2 = "NL"; + String headOfState = "Beatrix"; + Map queryParams = map("foo", country, "bar", code2, "baz", headOfState); + + try { + db.executeTransactionally(APOC_DV_QUERY, + map(NAME_KEY, JDBC_NAME, APOC_DV_QUERY_PARAMS_KEY, queryParams, + CONFIG_KEY, getJdbcCredentials(mysql)), + Result::resultAsString); + Assert.fail("Exception is expected"); + } catch (Exception e) { + final Throwable rootCause = ExceptionUtils.getRootCause(e); + assertTrue(rootCause instanceof IllegalArgumentException); + final List actualParams = queryParams.keySet().stream() + .map(s -> "$" + s) + .sorted() + .toList(); + assertEquals(String.format("Expected query parameters are %s, actual are %s", EXPECTED_LIST_SORTED, actualParams), rootCause.getMessage()); + } + } + +} diff --git a/full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogTestUtil.java b/full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogTestUtil.java new file mode 100644 index 0000000000..ba91df9a50 --- /dev/null +++ b/full-it/src/test/java/apoc/full/it/dv/DataVirtualizationCatalogTestUtil.java @@ -0,0 +1,297 @@ +package apoc.full.it.dv; + +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Path; +import org.neo4j.graphdb.QueryExecutionException; +import org.neo4j.graphdb.Relationship; +import org.testcontainers.containers.JdbcDatabaseContainer; + +import java.util.List; +import java.util.Map; + +import static apoc.util.MapUtil.map; +import static apoc.util.TestUtil.getUrlFileName; +import static apoc.util.TestUtil.testCall; +import static apoc.util.TestUtil.testCallCount; +import static apoc.util.TestUtil.testCallEmpty; +import static org.junit.Assert.assertEquals; +import static org.neo4j.configuration.GraphDatabaseSettings.DEFAULT_DATABASE_NAME; + +public class DataVirtualizationCatalogTestUtil { + + // -- Constants + public static final String AGE_KEY = "age"; + public static final String MAP_KEY = "map"; + public static final String APOC_DV_QUERY_PARAMS_KEY = "queryParams"; + public static final String CONFIG_KEY = "config"; + public static final Map CONFIG_VALUE = map("header", true); + public static final String CSV_NAME_VALUE = "csv_vr"; + public static final String CSV_TEST_FILE = "test.csv"; + public static final String DATABASE_NAME = "databaseName"; + public static final String DESC_KEY = "desc"; + public static final String DESC_VALUE = "person's details"; + public static final List EXPECTED_LIST = List.of("$name", "$head_of_state", "$CODE2"); + public static final List EXPECTED_LIST_SORTED = List.of("$name", "$head_of_state", "$CODE2").stream().sorted().toList(); + public static final String FILE_URL = getUrlFileName(CSV_TEST_FILE).toString(); + public static final String HOOK_NODE_NAME_KEY = "hookNodeName"; + public static final String HOOK_NODE_NAME_VALUE = "node to test linking"; + public static final String JDBC_VALUE = "JDBC"; + public static final String HOOK_LABEL = "Hook"; + public static final String CREDENTIALS_KEY = "credentials"; + public static List LABELS = List.of("Person"); + public static final String LABELS_KEY = "labels"; + public static final String LABELS_VALUE = "Person"; + public static final String NAME_KEY = "name"; + public static final String NODE_KEY = "node"; + public static final String PARAMS_KEY = "params"; + public static final List PARAMS_VALUE = List.of("$name", "$age"); + public static final String NAME_VALUE = "Rana"; + public static final String AGE_VALUE = "11"; + public static final String QUERY_KEY = "query"; + public static final String QUERY_VALUE = "map.name = $name and map.age = $age"; + public static final String RELTYPE_KEY = "relType"; + public static final String RELTYPE_VALUE = "LINKED_TO"; + public static final String TYPE_KEY = "type"; + public static final String TYPE_VALUE = "CSV"; + public static final String URL_KEY = "url"; + public static final String VIRTUALIZE_JDBC_QUERY = "SELECT * FROM country WHERE Name = ?"; + public static final String JDBC_SELECT_QUERY = "SELECT * FROM country WHERE Name = $name"; + public static final String JDBC_SELECT_QUERY_WITH_PARAM = "SELECT * FROM country WHERE Name = $name AND param_with_question_mark = ? "; + public static final String VIRTUALIZE_JDBC_COUNTRY = "Netherlands"; + public static final String CODE2 = "NL"; + public static final String HEAD_OF_STATE = "Beatrix"; + public static final List VIRTUALIZE_JDBC_APOC_PARAMS = List.of(VIRTUALIZE_JDBC_COUNTRY); + public static final Map VIRTUALIZE_JDBC_QUERY_PARAMS = map(NAME_KEY, VIRTUALIZE_JDBC_COUNTRY, "CODE2", CODE2, "head_of_state", HEAD_OF_STATE); + public static final Map VIRTUALIZE_JDBC_QUERY_WRONG_PARAMS = map("foo", VIRTUALIZE_JDBC_COUNTRY, "bar", CODE2, "baz", HEAD_OF_STATE); + public static final String VIRTUALIZE_JDBC_WITH_PARAMS_RELTYPE = "LINKED_TO_NEW"; + public static final String JDBC_NAME = "jdbc_vr"; + public static final String JDBC_DESC = "country details"; + public static final List