diff --git a/core/src/main/java/apoc/trigger/Trigger.java b/core/src/main/java/apoc/trigger/Trigger.java index e10c02256..98f7cb441 100644 --- a/core/src/main/java/apoc/trigger/Trigger.java +++ b/core/src/main/java/apoc/trigger/Trigger.java @@ -3,6 +3,7 @@ import apoc.util.Util; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.logging.Log; +import org.neo4j.procedure.Admin; import org.neo4j.procedure.Context; import org.neo4j.procedure.Description; import org.neo4j.procedure.Mode; @@ -20,7 +21,7 @@ public class Trigger { // public for testing purpose - public static final String SYS_NON_WRITER_ERROR = """ + public static final String DB_NON_WRITER_ERROR = """ This instance is not allowed to write to the database because it is a follower. Please use these procedures against the database leader. """; @@ -41,10 +42,11 @@ private void preprocessDeprecatedProcedures() { log.warn(msgDeprecation); if (!Util.isWriteableInstance(db)) { - throw new RuntimeException(SYS_NON_WRITER_ERROR + msgDeprecation); + throw new RuntimeException(DB_NON_WRITER_ERROR + msgDeprecation); } } + @Admin @Deprecated @Procedure(name = "apoc.trigger.add", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.install") @Description("Adds a trigger to the given Cypher statement.\n" + @@ -63,6 +65,7 @@ public Stream add(@Name("name") String name, @Name("kernelTransacti return Stream.of(new TriggerInfo(name,statement,selector, params,true, false)); } + @Admin @Deprecated @Procedure(name = "apoc.trigger.remove", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.drop") @Description("Removes the given trigger.") @@ -76,6 +79,7 @@ public Stream remove(@Name("name")String name) { return Stream.of(new TriggerInfo(name,(String)removed.get("statement"), (Map) removed.get("selector"), (Map) removed.get("params"),false, false)); } + @Admin @Deprecated @Procedure(name = "apoc.trigger.removeAll", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.dropAll") @Description("Removes all previously added triggers.") @@ -89,6 +93,7 @@ public Stream removeAll() { return removed.entrySet().stream().map(TriggerInfo::entryToTriggerInfo); } + @Admin @Procedure(name = "apoc.trigger.list", mode = Mode.READ) @Description("Lists all installed triggers.") public Stream list() { @@ -102,6 +107,7 @@ public Stream list() { ); } + @Admin @Deprecated @Procedure(name = "apoc.trigger.pause", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.stop") @Description("Pauses the given trigger.") @@ -116,6 +122,7 @@ public Stream pause(@Name("name")String name) { (Map) paused.get("params"),true, true)); } + @Admin @Deprecated @Procedure(name = "apoc.trigger.resume", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.start") @Description("Resumes the given paused trigger.") diff --git a/core/src/main/java/apoc/trigger/TriggerNewProcedures.java b/core/src/main/java/apoc/trigger/TriggerNewProcedures.java index 48ad7936d..a34845bde 100644 --- a/core/src/main/java/apoc/trigger/TriggerNewProcedures.java +++ b/core/src/main/java/apoc/trigger/TriggerNewProcedures.java @@ -6,6 +6,7 @@ import org.neo4j.kernel.api.procedure.SystemProcedure; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.logging.Log; +import org.neo4j.procedure.Admin; import org.neo4j.procedure.Context; import org.neo4j.procedure.Description; import org.neo4j.procedure.Mode; @@ -21,7 +22,7 @@ public class TriggerNewProcedures { // public for testing purpose - public static final String TRIGGER_NOT_ROUTED_ERROR = "The procedure should be routed and executed against a writer system database"; + public static final String TRIGGER_NOT_ROUTED_ERROR = "The procedure should be routed and executed against a writer database"; public static final String TRIGGER_BAD_TARGET_ERROR = "Triggers can only be installed on user databases."; @Context public GraphDatabaseAPI db; @@ -46,6 +47,7 @@ private void checkTargetDatabase(String databaseName) { // TODO - change with @SystemOnlyProcedure @SystemProcedure + @Admin @Procedure(mode = Mode.WRITE) @Description("CALL apoc.trigger.install(databaseName, name, statement, selector, config) | eventually adds a trigger for a given database which is invoked when a successful transaction occurs.") public Stream install(@Name("databaseName") String databaseName, @Name("name") String name, @Name("statement") String statement, @Name(value = "selector") Map selector, @Name(value = "config", defaultValue = "{}") Map config) { @@ -64,6 +66,7 @@ public Stream install(@Name("databaseName") String databaseName, @N // TODO - change with @SystemOnlyProcedure @SystemProcedure + @Admin @Procedure(mode = Mode.WRITE) @Description("CALL apoc.trigger.drop(databaseName, name) | eventually removes an existing trigger, returns the trigger's information") public Stream drop(@Name("databaseName") String databaseName, @Name("name")String name) { @@ -78,6 +81,7 @@ public Stream drop(@Name("databaseName") String databaseName, @Name // TODO - change with @SystemOnlyProcedure @SystemProcedure + @Admin @Procedure(mode = Mode.WRITE) @Description("CALL apoc.trigger.dropAll(databaseName) | eventually removes all previously added trigger, returns triggers' information") public Stream dropAll(@Name("databaseName") String databaseName) { @@ -88,6 +92,7 @@ public Stream dropAll(@Name("databaseName") String databaseName) { // TODO - change with @SystemOnlyProcedure @SystemProcedure + @Admin @Procedure(mode = Mode.WRITE) @Description("CALL apoc.trigger.stop(databaseName, name) | eventually pauses the trigger") public Stream stop(@Name("databaseName") String databaseName, @Name("name")String name) { @@ -99,6 +104,7 @@ public Stream stop(@Name("databaseName") String databaseName, @Name // TODO - change with @SystemOnlyProcedure @SystemProcedure + @Admin @Procedure(mode = Mode.WRITE) @Description("CALL apoc.trigger.start(databaseName, name) | eventually unpauses the paused trigger") public Stream start(@Name("databaseName") String databaseName, @Name("name")String name) { diff --git a/core/src/test/java/apoc/trigger/TriggerClusterRoutingTest.java b/core/src/test/java/apoc/trigger/TriggerClusterRoutingTest.java index ab52d6443..c3dd06891 100644 --- a/core/src/test/java/apoc/trigger/TriggerClusterRoutingTest.java +++ b/core/src/test/java/apoc/trigger/TriggerClusterRoutingTest.java @@ -6,7 +6,9 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.neo4j.driver.AuthTokens; import org.neo4j.driver.Driver; +import org.neo4j.driver.GraphDatabase; import org.neo4j.driver.Session; import org.neo4j.driver.SessionConfig; @@ -15,10 +17,14 @@ import java.util.Map; import java.util.UUID; +import static apoc.trigger.Trigger.DB_NON_WRITER_ERROR; +import static apoc.trigger.TriggerNewProcedures.TRIGGER_NOT_ROUTED_ERROR; import static apoc.util.TestContainerUtil.testCall; import static org.junit.Assert.assertEquals; import static org.neo4j.configuration.GraphDatabaseSettings.DEFAULT_DATABASE_NAME; import static org.neo4j.configuration.GraphDatabaseSettings.SYSTEM_DATABASE_NAME; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TriggerClusterRoutingTest { private static TestcontainersCausalCluster cluster; @@ -46,42 +52,127 @@ public static void bringDownCluster() { @Test public void testTriggerAddAllowedOnlyInSysLeaderMember() { final String query = "CALL apoc.trigger.add($name, 'RETURN 1', {})"; - triggerInSysWriterMemberCommon(query, DEFAULT_DATABASE_NAME); + triggerInSysLeaderMemberCommon(query, DB_NON_WRITER_ERROR, DEFAULT_DATABASE_NAME); } @Test public void testTriggerRemoveAllowedOnlyInSysLeaderMember() { final String query = "CALL apoc.trigger.remove($name)"; - triggerInSysWriterMemberCommon(query, DEFAULT_DATABASE_NAME); + triggerInSysLeaderMemberCommon(query, DB_NON_WRITER_ERROR, DEFAULT_DATABASE_NAME); } @Test - public void testTriggerInstallAllowedOnlyInSysWriterMember() { + public void testTriggerInstallAllowedOnlyInSysLeaderMember() { final String query = "CALL apoc.trigger.install('neo4j', $name, 'RETURN 1', {})"; - triggerInSysWriterMemberCommon(query, SYSTEM_DATABASE_NAME); + triggerInSysLeaderMemberCommon(query, TRIGGER_NOT_ROUTED_ERROR, SYSTEM_DATABASE_NAME); } @Test - public void testTriggerDropAllowedOnlyInSysWriterMember() { + public void testTriggerDropAllowedOnlyInSysLeaderMember() { final String query = "CALL apoc.trigger.drop('neo4j', $name)"; - triggerInSysWriterMemberCommon(query, SYSTEM_DATABASE_NAME); + triggerInSysLeaderMemberCommon(query, TRIGGER_NOT_ROUTED_ERROR, SYSTEM_DATABASE_NAME); } - private static void triggerInSysWriterMemberCommon(String query, String dbName) { + private static void triggerInSysLeaderMemberCommon(String query, String triggerNotRoutedError, String dbName) { final List members = cluster.getClusterMembers(); assertEquals(4, members.size()); for (Neo4jContainerExtension container: members) { // we skip READ_REPLICA members - final String readReplica = TestcontainersCausalCluster.ClusterInstanceType.READ_REPLICA.toString(); - final Driver driver = container.getDriver(); - if (readReplica.equals(container.getEnvMap().get("NEO4J_dbms_mode")) || driver == null) { + final Driver driver = getDriverIfNotReplica(container); + if (driver == null) { continue; } + System.out.println("TriggerClusterRoutingTest.triggerInSysLeaderMemberCommon"); Session session = driver.session(SessionConfig.forDatabase(dbName)); - final String name = UUID.randomUUID().toString(); - testCall( session, query, - Map.of("name", name), - row -> assertEquals(name, row.get("name")) ); + final String address = container.getEnvMap().get("NEO4J_dbms_connector_bolt_advertised__address"); + if (dbIsWriter(session, dbName, address)) { + final String name = UUID.randomUUID().toString(); + testCall( session, query, + Map.of("name", name), + row -> assertEquals(name, row.get("name")) ); + } else { + try { + testCall(session, query, + Map.of("name", UUID.randomUUID().toString()), + row -> fail("Should fail because of non writer trigger addition")); + } catch (Exception e) { + String errorMsg = e.getMessage(); + assertTrue("The actual message is: " + errorMsg, errorMsg.contains(triggerNotRoutedError)); + } + } + } + } + + @Test + public void testTriggersAllowedOnlyWithAdmin() { + for (Neo4jContainerExtension container: cluster.getClusterMembers()) { + // we skip READ_REPLICA members + final Driver driver = getDriverIfNotReplica(container); + if (driver == null) { + continue; + } + System.out.println("TriggerClusterRoutingTest.testTriggersAllowedOnlyWithAdmin"); + final String address = container.getEnvMap().get("NEO4J_dbms_connector_bolt_advertised__address"); + String noAdminUser = "nonadmin"; + String noAdminPwd = "test1234"; + try (Session sysSession = driver.session(SessionConfig.forDatabase(SYSTEM_DATABASE_NAME))) { + if (!dbIsWriter(sysSession, SYSTEM_DATABASE_NAME, address)) { + return; + } + sysSession.run(String.format("CREATE USER %s SET PASSWORD '%s' SET PASSWORD CHANGE NOT REQUIRED", + noAdminUser, noAdminPwd)); + } + + try (Driver userDriver = GraphDatabase.driver(cluster.getURI(), AuthTokens.basic(noAdminUser, noAdminPwd))) { + + try (Session sysUserSession = userDriver.session(SessionConfig.forDatabase(SYSTEM_DATABASE_NAME))) { + failsWithNonAdminUser(sysUserSession, "apoc.trigger.install", "call apoc.trigger.install('neo4j', 'qwe', 'return 1', {})"); + failsWithNonAdminUser(sysUserSession, "apoc.trigger.drop", "call apoc.trigger.drop('neo4j', 'qwe')"); + failsWithNonAdminUser(sysUserSession, "apoc.trigger.dropAll", "call apoc.trigger.dropAll('neo4j')"); + failsWithNonAdminUser(sysUserSession, "apoc.trigger.stop", "call apoc.trigger.stop('neo4j', 'qwe')"); + failsWithNonAdminUser(sysUserSession, "apoc.trigger.start", "call apoc.trigger.start('neo4j', 'qwe')"); + failsWithNonAdminUser(sysUserSession, "apoc.trigger.show", "call apoc.trigger.show('neo4j')"); + } + + try (Session neo4jUserSession = userDriver.session(SessionConfig.forDatabase(DEFAULT_DATABASE_NAME))) { + failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.add", "call apoc.trigger.add('abc', 'return 1', {})"); + failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.remove", "call apoc.trigger.remove('abc')"); + failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.removeAll", "call apoc.trigger.removeAll"); + failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.pause", "call apoc.trigger.pause('abc')"); + failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.resume", "call apoc.trigger.resume('abc')"); + failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.list", "call apoc.trigger.list"); + } + } } } + + private static Driver getDriverIfNotReplica(Neo4jContainerExtension container) { + final String readReplica = TestcontainersCausalCluster.ClusterInstanceType.READ_REPLICA.toString(); + final Driver driver = container.getDriver(); + if (readReplica.equals(container.getEnvMap().get("NEO4J_dbms_mode")) || driver == null) { + return null; + } + return driver; + } + + private static boolean dbIsWriter(Session session, String dbName, String address) { + return session.run( "SHOW DATABASE $dbName WHERE address = $address", + Map.of("dbName", dbName, "address", address) ) + .single().get("writer") + .asBoolean(); + } + + private void failsWithNonAdminUser(Session session, String procName, String query) { + try { + testCall(session, query, + row -> fail("Should fail because of non admin user") ); + } catch (Exception e) { + String actual = e.getMessage(); + final String expected = String.format("Executing admin procedure '%s' permission has not been granted for user 'nonadmin'", + procName); + assertTrue("Actual error message is: " + actual, actual.contains(expected)); + } + } + + }