Skip to content

Commit

Permalink
[gRNAet7B] Changed cluster get driver method
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Jan 10, 2023
1 parent 2b312b3 commit 040fb29
Showing 1 changed file with 50 additions and 17 deletions.
67 changes: 50 additions & 17 deletions extended/src/test/java/apoc/trigger/TriggerClusterTest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package apoc.trigger;

import apoc.util.Neo4jContainerExtension;
import apoc.util.TestContainerUtil;
import apoc.util.TestContainerUtil.ApocPackage;
import apoc.util.TestcontainersCausalCluster;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.neo4j.driver.Driver;
import org.neo4j.driver.types.Node;
import org.neo4j.driver.Session;

Expand All @@ -24,6 +26,8 @@
public class TriggerClusterTest {

private static TestcontainersCausalCluster cluster;

private static Driver driver;

@BeforeClass
public static void setupCluster() {
Expand All @@ -33,6 +37,8 @@ public static void setupCluster() {
1,
Collections.emptyMap(),
Map.of("apoc.trigger.refresh", "100", "apoc.trigger.enabled", "true"));

driver = getWriteDriver();
}

@AfterClass
Expand All @@ -42,13 +48,13 @@ public static void bringDownCluster() {

@Before
public void before() {
cluster.getSession().run("CALL apoc.trigger.removeAll()");
driver.session().run("CALL apoc.trigger.removeAll()");
cluster.getSession().run("MATCH (n) DETACH DELETE n");
}

@Test
public void testTimeStampTriggerForUpdatedProperties() throws Exception {
cluster.getSession().run("CALL apoc.trigger.add('timestamp','UNWIND apoc.trigger.nodesByLabel($assignedNodeProperties,null) AS n SET n.ts = timestamp()',{})");
driver.session().run("CALL apoc.trigger.add('timestamp','UNWIND apoc.trigger.nodesByLabel($assignedNodeProperties,null) AS n SET n.ts = timestamp()',{})");
cluster.getSession().run("CREATE (f:Foo) SET f.foo='bar'");
TestContainerUtil.testCall(cluster.getSession(), "MATCH (f:Foo) RETURN f", (row) -> {
assertEquals(true, ((Node)row.get("f")).containsKey("ts"));
Expand All @@ -57,17 +63,17 @@ public void testTimeStampTriggerForUpdatedProperties() throws Exception {

@Test
public void testReplication() throws Exception {
cluster.getSession().run("CALL apoc.trigger.add('timestamp','UNWIND apoc.trigger.nodesByLabel($assignedNodeProperties,null) AS n SET n.ts = timestamp()',{})");
driver.session().run("CALL apoc.trigger.add('timestamp','UNWIND apoc.trigger.nodesByLabel($assignedNodeProperties,null) AS n SET n.ts = timestamp()',{})");
// Test that the trigger is present in another instance
org.neo4j.test.assertion.Assert.assertEventually(() -> cluster.getDriver().session()
org.neo4j.test.assertion.Assert.assertEventually(() -> driver.session()
.readTransaction(tx -> tx.run("CALL apoc.trigger.list() YIELD name RETURN name").single().get("name").asString()),
(value) -> "timestamp".equals(value), 30, TimeUnit.SECONDS);
}

@Test
public void testLowerCaseName() throws Exception {
cluster.getSession().run("create constraint on (p:Person) assert p.id is unique");
cluster.getSession().run("CALL apoc.trigger.add('lowercase','UNWIND apoc.trigger.nodesByLabel($assignedLabels,\"Person\") AS n SET n.id = toLower(n.name)',{})");
driver.session().run("CALL apoc.trigger.add('lowercase','UNWIND apoc.trigger.nodesByLabel($assignedLabels,\"Person\") AS n SET n.id = toLower(n.name)',{})");
cluster.getSession().run("CREATE (f:Person {name:'John Doe'})");
TestContainerUtil.testCall(cluster.getSession(), "MATCH (f:Person) RETURN f", (row) -> {
assertEquals("john doe", ((Node)row.get("f")).get("id").asString());
Expand All @@ -78,7 +84,7 @@ public void testLowerCaseName() throws Exception {
@Test
public void testSetLabels() throws Exception {
cluster.getSession().run("CREATE (f {name:'John Doe'})");
cluster.getSession().run("CALL apoc.trigger.add('setlabels','UNWIND apoc.trigger.nodesByLabel($assignedLabels,\"Person\") AS n SET n:Man',{})");
driver.session().run("CALL apoc.trigger.add('setlabels','UNWIND apoc.trigger.nodesByLabel($assignedLabels,\"Person\") AS n SET n:Man',{})");
cluster.getSession().run("MATCH (f) SET f:Person");
TestContainerUtil.testCall(cluster.getSession(), "MATCH (f:Man) RETURN f", (row) -> {
assertEquals("John Doe", ((Node)row.get("f")).get("name").asString());
Expand All @@ -91,7 +97,7 @@ public void testSetLabels() throws Exception {

@Test
public void testTxIdAfterAsync() throws Exception {
cluster.getSession().run("CALL apoc.trigger.add('triggerTest','UNWIND apoc.trigger.propertiesByKey($assignedNodeProperties, \"_executed\") as prop " +
driver.session().run("CALL apoc.trigger.add('triggerTest','UNWIND apoc.trigger.propertiesByKey($assignedNodeProperties, \"_executed\") as prop " +
" WITH prop.node as n " +
" CREATE (z:SON {father:id(n)}) " +
" CREATE (n)-[:GENERATED]->(z)', " +
Expand All @@ -109,11 +115,11 @@ public void testTxIdAfterAsync() throws Exception {
@Test
public void testTimeStampTriggerForUpdatedPropertiesNewProcedures() throws Exception {
final String name = "timestampUpdate";
try (final Session session = cluster.getDriver().session(forDatabase(SYSTEM_DATABASE_NAME))) {
try (final Session session = driver.session(forDatabase(SYSTEM_DATABASE_NAME))) {
session.run("CALL apoc.trigger.install('neo4j', $name,'UNWIND apoc.trigger.nodesByLabel($assignedNodeProperties,null) AS n SET n.ts = timestamp()',{})",
Map.of("name", name));
}
try (final Session session = cluster.getSession()) {
try (final Session session = driver.session()) {
awaitProcedureInstalled(session, "timestampUpdate");
session.run("CREATE (f:Foo) SET f.foo='bar'");
TestContainerUtil.testCall(session, "MATCH (f:Foo) RETURN f", (row) -> {
Expand All @@ -124,21 +130,21 @@ public void testTimeStampTriggerForUpdatedPropertiesNewProcedures() throws Excep

@Test
public void testReplicationNewProcedures() throws Exception {
try (final Session session = cluster.getDriver().session(forDatabase(SYSTEM_DATABASE_NAME))) {
try (final Session session = driver.session(forDatabase(SYSTEM_DATABASE_NAME))) {
session.run("CALL apoc.trigger.install('neo4j', 'timestamp','UNWIND apoc.trigger.nodesByLabel($assignedNodeProperties,null) AS n SET n.ts = timestamp()',{})");
}
// Test that the trigger is present in another instance
awaitProcedureInstalled(cluster.getDriver().session(), "timestamp");
awaitProcedureInstalled(driver.session(), "timestamp");
}

@Test
public void testLowerCaseNameNewProcedures() {
final String name = "lowercase";
try (final Session session = cluster.getDriver().session(forDatabase(SYSTEM_DATABASE_NAME))) {
try (final Session session = driver.session(forDatabase(SYSTEM_DATABASE_NAME))) {
session.run("CALL apoc.trigger.install('neo4j', $name, 'UNWIND apoc.trigger.nodesByLabel($assignedLabels,\"Person\") AS n SET n.id = toLower(n.name)',{})",
Map.of("name", name));
}
try (final Session session = cluster.getSession()) {
try (final Session session = driver.session()) {
session.run("create constraint on (p:Person) assert p.id is unique");
awaitProcedureInstalled(session, name);

Expand All @@ -153,11 +159,11 @@ public void testLowerCaseNameNewProcedures() {
@Test
public void testSetLabelsNewProcs() throws Exception {
final String name = "testSetLabels";
try (final Session session = cluster.getDriver().session(forDatabase(SYSTEM_DATABASE_NAME))) {
try (final Session session = driver.session(forDatabase(SYSTEM_DATABASE_NAME))) {
session.run("CALL apoc.trigger.install('neo4j', $name,'UNWIND apoc.trigger.nodesByLabel($assignedLabels,\"Person\") AS n SET n:Man',{})",
Map.of("name", name));
}
try (final Session session = cluster.getSession()) {
try (final Session session = driver.session()) {
session.run("CREATE (f:Test {name:'John Doe'})");

awaitProcedureInstalled(session, name);
Expand All @@ -176,15 +182,15 @@ public void testSetLabelsNewProcs() throws Exception {
@Test
public void testTxIdAfterAsyncNewProcedures() throws Exception {
final String name = "testTxIdAfterAsync";
try (final Session session = cluster.getDriver().session(forDatabase(SYSTEM_DATABASE_NAME))) {
try (final Session session = driver.session(forDatabase(SYSTEM_DATABASE_NAME))) {
session.run("CALL apoc.trigger.install('neo4j', $name, 'UNWIND apoc.trigger.propertiesByKey($assignedNodeProperties, \"_executed\") as prop " +
" WITH prop.node as n " +
" CREATE (z:SON {father:id(n)}) " +
" CREATE (n)-[:GENERATED]->(z)', " +
"{phase:'afterAsync'})",
Map.of("name", name));
}
try (final Session session = cluster.getSession()) {
try (final Session session = driver.session()) {
awaitProcedureInstalled(session, name);

session.run("CREATE (:TEST {name:'x', _executed:0})");
Expand All @@ -194,6 +200,33 @@ public void testTxIdAfterAsyncNewProcedures() throws Exception {
}
}

private static Driver getWriteDriver() {
// get the 1st system write driver found
final List<Neo4jContainerExtension> members = cluster.getClusterMembers();
for (Neo4jContainerExtension container: members) {
final String readReplica = TestcontainersCausalCluster.ClusterInstanceType.READ_REPLICA.toString();
final Driver driver = container.getDriver();
if (readReplica.equals(container.getEnvMap().get("NEO4J_dbms_mode")) || driver == null) {
continue;
}
final String address = container.getEnvMap().get("NEO4J_dbms_connector_bolt_advertised__address");
final boolean isSysWriter = dbIsWriter(driver, address);
if (isSysWriter) {
return driver;
}
}
throw new RuntimeException("No write member found");
}

private static boolean dbIsWriter(Driver driver, String address) {
final Session session = driver.session(forDatabase(SYSTEM_DATABASE_NAME));

return session.run( "SHOW DATABASE $dbName WHERE address = $address",
Map.of("dbName", SYSTEM_DATABASE_NAME, "address", address) )
.single().get("writer")
.asBoolean();
}

private static void awaitProcedureInstalled(Session session, String name) {
assertEventually(() -> session
.readTransaction(tx -> tx.run("CALL apoc.trigger.list")
Expand Down

0 comments on commit 040fb29

Please sign in to comment.