Skip to content

Commit

Permalink
[qwJFy8pp] Add @Admin to trigger procedures (neo4j-contrib/neo4j-apoc…
Browse files Browse the repository at this point in the history
…-procedures#3357)

* [qwJFy8pp] Add @Admin to trigger procedures
* added admin in trigger.list and docs note
* changed admin doc
  • Loading branch information
vga91 committed Dec 22, 2022
1 parent 5b741d2 commit 26cb203
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 17 deletions.
11 changes: 9 additions & 2 deletions core/src/main/java/apoc/trigger/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import apoc.util.Util;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Admin;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
Expand All @@ -20,7 +21,7 @@

public class Trigger {
// public for testing purpose
public static final String SYS_NON_WRITER_ERROR = """
public static final String DB_NON_WRITER_ERROR = """
This instance is not allowed to write to the database because it is a follower.
Please use these procedures against the database leader.
""";
Expand All @@ -41,10 +42,11 @@ private void preprocessDeprecatedProcedures() {
log.warn(msgDeprecation);

if (!Util.isWriteableInstance(db)) {
throw new RuntimeException(SYS_NON_WRITER_ERROR + msgDeprecation);
throw new RuntimeException(DB_NON_WRITER_ERROR + msgDeprecation);
}
}

@Admin
@Deprecated
@Procedure(name = "apoc.trigger.add", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.install")
@Description("Adds a trigger to the given Cypher statement.\n" +
Expand All @@ -63,6 +65,7 @@ public Stream<TriggerInfo> add(@Name("name") String name, @Name("kernelTransacti
return Stream.of(new TriggerInfo(name,statement,selector, params,true, false));
}

@Admin
@Deprecated
@Procedure(name = "apoc.trigger.remove", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.drop")
@Description("Removes the given trigger.")
Expand All @@ -76,6 +79,7 @@ public Stream<TriggerInfo> remove(@Name("name")String name) {
return Stream.of(new TriggerInfo(name,(String)removed.get("statement"), (Map<String, Object>) removed.get("selector"), (Map<String, Object>) removed.get("params"),false, false));
}

@Admin
@Deprecated
@Procedure(name = "apoc.trigger.removeAll", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.dropAll")
@Description("Removes all previously added triggers.")
Expand All @@ -89,6 +93,7 @@ public Stream<TriggerInfo> removeAll() {
return removed.entrySet().stream().map(TriggerInfo::entryToTriggerInfo);
}

@Admin
@Procedure(name = "apoc.trigger.list", mode = Mode.READ)
@Description("Lists all installed triggers.")
public Stream<TriggerInfo> list() {
Expand All @@ -102,6 +107,7 @@ public Stream<TriggerInfo> list() {
);
}

@Admin
@Deprecated
@Procedure(name = "apoc.trigger.pause", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.stop")
@Description("Pauses the given trigger.")
Expand All @@ -116,6 +122,7 @@ public Stream<TriggerInfo> pause(@Name("name")String name) {
(Map<String,Object>) paused.get("params"),true, true));
}

@Admin
@Deprecated
@Procedure(name = "apoc.trigger.resume", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.start")
@Description("Resumes the given paused trigger.")
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/java/apoc/trigger/TriggerNewProcedures.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.neo4j.kernel.api.procedure.SystemProcedure;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Admin;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
Expand All @@ -21,7 +22,7 @@

public class TriggerNewProcedures {
// public for testing purpose
public static final String TRIGGER_NOT_ROUTED_ERROR = "The procedure should be routed and executed against a writer system database";
public static final String TRIGGER_NOT_ROUTED_ERROR = "The procedure should be routed and executed against a writer database";
public static final String TRIGGER_BAD_TARGET_ERROR = "Triggers can only be installed on user databases.";

@Context public GraphDatabaseAPI db;
Expand All @@ -46,6 +47,7 @@ private void checkTargetDatabase(String databaseName) {

// TODO - change with @SystemOnlyProcedure
@SystemProcedure
@Admin
@Procedure(mode = Mode.WRITE)
@Description("CALL apoc.trigger.install(databaseName, name, statement, selector, config) | eventually adds a trigger for a given database which is invoked when a successful transaction occurs.")
public Stream<TriggerInfo> install(@Name("databaseName") String databaseName, @Name("name") String name, @Name("statement") String statement, @Name(value = "selector") Map<String,Object> selector, @Name(value = "config", defaultValue = "{}") Map<String,Object> config) {
Expand All @@ -64,6 +66,7 @@ public Stream<TriggerInfo> install(@Name("databaseName") String databaseName, @N

// TODO - change with @SystemOnlyProcedure
@SystemProcedure
@Admin
@Procedure(mode = Mode.WRITE)
@Description("CALL apoc.trigger.drop(databaseName, name) | eventually removes an existing trigger, returns the trigger's information")
public Stream<TriggerInfo> drop(@Name("databaseName") String databaseName, @Name("name")String name) {
Expand All @@ -78,6 +81,7 @@ public Stream<TriggerInfo> drop(@Name("databaseName") String databaseName, @Name

// TODO - change with @SystemOnlyProcedure
@SystemProcedure
@Admin
@Procedure(mode = Mode.WRITE)
@Description("CALL apoc.trigger.dropAll(databaseName) | eventually removes all previously added trigger, returns triggers' information")
public Stream<TriggerInfo> dropAll(@Name("databaseName") String databaseName) {
Expand All @@ -88,6 +92,7 @@ public Stream<TriggerInfo> dropAll(@Name("databaseName") String databaseName) {

// TODO - change with @SystemOnlyProcedure
@SystemProcedure
@Admin
@Procedure(mode = Mode.WRITE)
@Description("CALL apoc.trigger.stop(databaseName, name) | eventually pauses the trigger")
public Stream<TriggerInfo> stop(@Name("databaseName") String databaseName, @Name("name")String name) {
Expand All @@ -99,6 +104,7 @@ public Stream<TriggerInfo> stop(@Name("databaseName") String databaseName, @Name

// TODO - change with @SystemOnlyProcedure
@SystemProcedure
@Admin
@Procedure(mode = Mode.WRITE)
@Description("CALL apoc.trigger.start(databaseName, name) | eventually unpauses the paused trigger")
public Stream<TriggerInfo> start(@Name("databaseName") String databaseName, @Name("name")String name) {
Expand Down
116 changes: 102 additions & 14 deletions core/src/test/java/apoc/trigger/TriggerClusterRoutingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;

Expand All @@ -15,10 +17,14 @@
import java.util.Map;
import java.util.UUID;

import static apoc.trigger.Trigger.DB_NON_WRITER_ERROR;
import static apoc.trigger.TriggerNewProcedures.TRIGGER_NOT_ROUTED_ERROR;
import static apoc.util.TestContainerUtil.testCall;
import static org.junit.Assert.assertEquals;
import static org.neo4j.configuration.GraphDatabaseSettings.DEFAULT_DATABASE_NAME;
import static org.neo4j.configuration.GraphDatabaseSettings.SYSTEM_DATABASE_NAME;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class TriggerClusterRoutingTest {
private static TestcontainersCausalCluster cluster;
Expand Down Expand Up @@ -46,42 +52,124 @@ public static void bringDownCluster() {
@Test
public void testTriggerAddAllowedOnlyInSysLeaderMember() {
final String query = "CALL apoc.trigger.add($name, 'RETURN 1', {})";
triggerInSysWriterMemberCommon(query, DEFAULT_DATABASE_NAME);
triggerInSysLeaderMemberCommon(query, DB_NON_WRITER_ERROR, DEFAULT_DATABASE_NAME);
}

@Test
public void testTriggerRemoveAllowedOnlyInSysLeaderMember() {
final String query = "CALL apoc.trigger.remove($name)";
triggerInSysWriterMemberCommon(query, DEFAULT_DATABASE_NAME);
triggerInSysLeaderMemberCommon(query, DB_NON_WRITER_ERROR, DEFAULT_DATABASE_NAME);
}

@Test
public void testTriggerInstallAllowedOnlyInSysWriterMember() {
public void testTriggerInstallAllowedOnlyInSysLeaderMember() {
final String query = "CALL apoc.trigger.install('neo4j', $name, 'RETURN 1', {})";
triggerInSysWriterMemberCommon(query, SYSTEM_DATABASE_NAME);
triggerInSysLeaderMemberCommon(query, TRIGGER_NOT_ROUTED_ERROR, SYSTEM_DATABASE_NAME);
}

@Test
public void testTriggerDropAllowedOnlyInSysWriterMember() {
public void testTriggerDropAllowedOnlyInSysLeaderMember() {
final String query = "CALL apoc.trigger.drop('neo4j', $name)";
triggerInSysWriterMemberCommon(query, SYSTEM_DATABASE_NAME);
triggerInSysLeaderMemberCommon(query, TRIGGER_NOT_ROUTED_ERROR, SYSTEM_DATABASE_NAME);
}

private static void triggerInSysWriterMemberCommon(String query, String dbName) {
private static void triggerInSysLeaderMemberCommon(String query, String triggerNotRoutedError, String dbName) {
final List<Neo4jContainerExtension> members = cluster.getClusterMembers();
assertEquals(4, members.size());
for (Neo4jContainerExtension container: members) {
// we skip READ_REPLICA members
final String readReplica = TestcontainersCausalCluster.ClusterInstanceType.READ_REPLICA.toString();
final Driver driver = container.getDriver();
if (readReplica.equals(container.getEnvMap().get("NEO4J_dbms_mode")) || driver == null) {
final Driver driver = getDriverIfNotReplica(container);
if (driver == null) {
continue;
}
System.out.println("TriggerClusterRoutingTest.triggerInSysLeaderMemberCommon");
Session session = driver.session(SessionConfig.forDatabase(dbName));
final String name = UUID.randomUUID().toString();
testCall( session, query,
Map.of("name", name),
row -> assertEquals(name, row.get("name")) );
final String address = container.getEnvMap().get("NEO4J_dbms_connector_bolt_advertised__address");
if (dbIsWriter(session, dbName, address)) {
final String name = UUID.randomUUID().toString();
testCall( session, query,
Map.of("name", name),
row -> assertEquals(name, row.get("name")) );
} else {
try {
testCall(session, query,
Map.of("name", UUID.randomUUID().toString()),
row -> fail("Should fail because of non writer trigger addition"));
} catch (Exception e) {
String errorMsg = e.getMessage();
assertTrue("The actual message is: " + errorMsg, errorMsg.contains(triggerNotRoutedError));
}
}
}
}

@Test
public void testTriggersAllowedOnlyWithAdmin() {
for (Neo4jContainerExtension container: cluster.getClusterMembers()) {
// we skip READ_REPLICA members
final Driver driver = getDriverIfNotReplica(container);
if (driver == null) {
continue;
}
System.out.println("TriggerClusterRoutingTest.testTriggersAllowedOnlyWithAdmin");
final String address = container.getEnvMap().get("NEO4J_dbms_connector_bolt_advertised__address");
try (Session sysSession = driver.session(SessionConfig.forDatabase(SYSTEM_DATABASE_NAME))) {
if (!dbIsWriter(sysSession, SYSTEM_DATABASE_NAME, address)) {
return;
}
sysSession.run("CREATE USER nonadmin SET PASSWORD 'test' SET PASSWORD CHANGE NOT REQUIRED");
}

try (Driver userDriver = GraphDatabase.driver(cluster.getURI(), AuthTokens.basic("nonadmin", "test1234"))) {

try (Session sysUserSession = userDriver.session(SessionConfig.forDatabase(SYSTEM_DATABASE_NAME))) {
failsWithNonAdminUser(sysUserSession, "apoc.trigger.install", "call apoc.trigger.install('neo4j', 'qwe', 'return 1', {})");
failsWithNonAdminUser(sysUserSession, "apoc.trigger.drop", "call apoc.trigger.drop('neo4j', 'qwe')");
failsWithNonAdminUser(sysUserSession, "apoc.trigger.dropAll", "call apoc.trigger.dropAll('neo4j')");
failsWithNonAdminUser(sysUserSession, "apoc.trigger.stop", "call apoc.trigger.stop('neo4j', 'qwe')");
failsWithNonAdminUser(sysUserSession, "apoc.trigger.start", "call apoc.trigger.start('neo4j', 'qwe')");
failsWithNonAdminUser(sysUserSession, "apoc.trigger.show", "call apoc.trigger.show('neo4j')");
}

try (Session neo4jUserSession = userDriver.session(SessionConfig.forDatabase(DEFAULT_DATABASE_NAME))) {
failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.add", "call apoc.trigger.add('abc', 'return 1', {})");
failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.remove", "call apoc.trigger.remove('abc')");
failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.removeAll", "call apoc.trigger.removeAll");
failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.pause", "call apoc.trigger.pause('abc')");
failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.resume", "call apoc.trigger.resume('abc')");
failsWithNonAdminUser(neo4jUserSession, "apoc.trigger.list", "call apoc.trigger.list");
}
}
}
}

private static Driver getDriverIfNotReplica(Neo4jContainerExtension container) {
final String readReplica = TestcontainersCausalCluster.ClusterInstanceType.READ_REPLICA.toString();
final Driver driver = container.getDriver();
if (readReplica.equals(container.getEnvMap().get("NEO4J_dbms_mode")) || driver == null) {
return null;
}
return driver;
}

private static boolean dbIsWriter(Session session, String dbName, String address) {
return session.run( "SHOW DATABASE $dbName WHERE address = $address",
Map.of("dbName", dbName, "address", address) )
.single().get("writer")
.asBoolean();
}

private void failsWithNonAdminUser(Session session, String procName, String query) {
try {
testCall(session, query,
row -> fail("Should fail because of non admin user") );
} catch (Exception e) {
String actual = e.getMessage();
final String expected = String.format("Executing admin procedure '%s' permission has not been granted for user 'nonadmin'",
procName);
assertTrue("Actual error message is: " + actual, actual.contains(expected));
}
}


}

0 comments on commit 26cb203

Please sign in to comment.