Skip to content

Commit

Permalink
[gRNAet7B] Added trigger show procedure (neo4j-contrib/neo4j-apoc-pro…
Browse files Browse the repository at this point in the history
…cedures#3335)

* Fix flaky trigger dbms availability tests
* added apoc.trigger.show proc
* retry to fix flaky test by separating setLastUpdate()
* removed updated output - some code refactoring
* split pr - changes review
* removed inner transaction
* updated documentation
* reset wrong generated docs
* Added admin in trigger.show
* small adoc changes
  • Loading branch information
vga91 committed Dec 15, 2022
1 parent 0435907 commit df9b393
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.function.Consumer;

import static apoc.ApocConfig.APOC_TRIGGER_ENABLED;
import static apoc.ApocConfig.apocConfig;

public class TriggerHandlerWrite {
public class TriggerHandlerNewProcedures {
public static final String NOT_ENABLED_ERROR = "Triggers have not been enabled." +
" Set 'apoc.trigger.enabled=true' in your apoc.conf file located in the $NEO4J_HOME/conf/ directory.";

public static Map<String, Object> toTriggerInfo(Node node) {
private static Map<String, Object> toTriggerInfo(Node node) {
return node.getAllProperties()
.entrySet().stream()
.filter(e -> !List.of(SystemPropertyKeys.name.name(), SystemPropertyKeys.database.name()).contains(e.getKey()))
.filter(e -> !SystemPropertyKeys.database.name().equals(e.getKey()))
.collect(HashMap::new, // workaround for https://bugs.openjdk.java.net/browse/JDK-8148463
(mapAccumulator, e) -> {
Object value = List.of(SystemPropertyKeys.selector.name(), SystemPropertyKeys.params.name()).contains(e.getKey())
Expand Down Expand Up @@ -53,7 +54,7 @@ public static Map<String, Object> install(String databaseName, String triggerNam
Pair.of(SystemPropertyKeys.name.name(), triggerName));

// we'll return previous trigger info
previous.putAll(TriggerHandlerWrite.toTriggerInfo(node));
previous.putAll(toTriggerInfo(node));

node.setProperty(SystemPropertyKeys.statement.name(), statement);
node.setProperty(SystemPropertyKeys.selector.name(), Util.toJson(selector));
Expand All @@ -72,7 +73,7 @@ public static Map<String, Object> drop(String databaseName, String triggerName)
withSystemDb(tx -> {
getTriggerNodes(databaseName, tx, triggerName)
.forEachRemaining(node -> {
previous.putAll(TriggerHandlerWrite.toTriggerInfo(node));
previous.putAll(toTriggerInfo(node));
node.delete();
});

Expand All @@ -91,7 +92,7 @@ public static Map<String, Object> updatePaused(String databaseName, String name,
node.setProperty( SystemPropertyKeys.paused.name(), paused );

// we'll return previous trigger info
result.putAll(TriggerHandlerWrite.toTriggerInfo(node));
result.putAll(toTriggerInfo(node));
});

setLastUpdate(databaseName, tx);
Expand All @@ -109,7 +110,7 @@ public static Map<String, Object> dropAll(String databaseName) {
String triggerName = (String) node.getProperty(SystemPropertyKeys.name.name());

// we'll return previous trigger info
previous.put(triggerName, TriggerHandlerWrite.toTriggerInfo(node));
previous.put(triggerName, toTriggerInfo(node));
node.delete();
});
setLastUpdate(databaseName, tx);
Expand All @@ -118,6 +119,13 @@ public static Map<String, Object> dropAll(String databaseName) {
return previous;
}

public static List<Map<String, Object>> getTriggerNodesList(String databaseName, Transaction tx) {
return getTriggerNodes(databaseName, tx)
.stream()
.map(TriggerHandlerNewProcedures::toTriggerInfo)
.collect(Collectors.toList());
}

public static ResourceIterator<Node> getTriggerNodes(String databaseName, Transaction tx) {
return getTriggerNodes(databaseName, tx, null);
}
Expand Down
64 changes: 40 additions & 24 deletions core/src/main/java/apoc/trigger/TriggerNewProcedures.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package apoc.trigger;

import apoc.SystemPropertyKeys;
import apoc.util.Util;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.api.procedure.SystemProcedure;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;
Expand Down Expand Up @@ -38,21 +41,28 @@ public TriggerInfo(String name, String query, Map<String, Object> selector, bool

public TriggerInfo( String name, String query, Map<String,Object> selector, Map<String,Object> params, boolean installed, boolean paused )
{
this.name = name;
this.query = query;
this.selector = selector;
this(name, query, selector, installed, paused);
this.params = params;
this.installed = installed;
this.paused = paused;
}

public static TriggerInfo from(Map<String, Object> mapInfo, boolean installed) {
return new TriggerInfo((String) mapInfo.get(SystemPropertyKeys.name.name()),
(String) mapInfo.get(SystemPropertyKeys.statement.name()),
(Map<String, Object>) mapInfo.get(SystemPropertyKeys.selector.name()),
(Map<String, Object>) mapInfo.get(SystemPropertyKeys.params.name()),
installed,
(boolean) mapInfo.getOrDefault(SystemPropertyKeys.paused.name(), true));
}
}

@Context public GraphDatabaseAPI db;

@Context public Log log;

@Context public Transaction tx;

private void checkInSystemWriter() {
TriggerHandlerWrite.checkEnabled();
TriggerHandlerNewProcedures.checkEnabled();
// routing check
if (!db.databaseName().equals(SYSTEM_DATABASE_NAME) || !Util.isWriteableInstance(db)) {
throw new RuntimeException(TRIGGER_NOT_ROUTED_ERROR);
Expand All @@ -64,7 +74,7 @@ public TriggerInfo toTriggerInfo(Map.Entry<String, Object> e) {
if (e.getValue() instanceof Map) {
try {
Map<String, Object> value = (Map<String, Object>) e.getValue();
return new TriggerInfo(name, (String) value.get("statement"), (Map<String, Object>) value.get("selector"), (Map<String, Object>) value.get("params"), false, false);
return TriggerInfo.from(value, false);
} catch(Exception ex) {
return new TriggerInfo(name, ex.getMessage(), null, false, false);
}
Expand All @@ -80,10 +90,10 @@ public Stream<TriggerInfo> install(@Name("databaseName") String databaseName, @N
checkInSystemWriter();

Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());
Map<String, Object> removed = TriggerHandlerWrite.install(databaseName, name, statement, selector, params);
if (!removed.isEmpty()) {
Map<String, Object> removed = TriggerHandlerNewProcedures.install(databaseName, name, statement, selector, params);
if (removed.containsKey(SystemPropertyKeys.statement.name())) {
return Stream.of(
new TriggerInfo( name, (String)removed.get( "statement"), (Map<String, Object>) removed.get( "selector"), (Map<String, Object>) removed.get( "params"), false, false),
TriggerInfo.from(removed, false),
new TriggerInfo( name, statement, selector, params, true, false));
}
return Stream.of(new TriggerInfo( name, statement, selector, params, true, false));
Expand All @@ -96,11 +106,11 @@ public Stream<TriggerInfo> install(@Name("databaseName") String databaseName, @N
@Description("CALL apoc.trigger.drop(databaseName, name) | remove previously added trigger, returns trigger information")
public Stream<TriggerInfo> drop(@Name("databaseName") String databaseName, @Name("name")String name) {
checkInSystemWriter();
Map<String, Object> removed = TriggerHandlerWrite.drop(databaseName, name);
if (removed.isEmpty()) {
Map<String, Object> removed = TriggerHandlerNewProcedures.drop(databaseName, name);
if (!removed.containsKey(SystemPropertyKeys.statement.name())) {
return Stream.of(new TriggerInfo(name, null, null, false, false));
}
return Stream.of(new TriggerInfo(name,(String)removed.get("statement"), (Map<String, Object>) removed.get("selector"), (Map<String, Object>) removed.get("params"),false, false));
return Stream.of(TriggerInfo.from(removed, false));
}


Expand All @@ -110,7 +120,7 @@ public Stream<TriggerInfo> drop(@Name("databaseName") String databaseName, @Name
@Description("CALL apoc.trigger.dropAll(databaseName) | removes all previously added trigger, returns trigger information")
public Stream<TriggerInfo> dropAll(@Name("databaseName") String databaseName) {
checkInSystemWriter();
Map<String, Object> removed = TriggerHandlerWrite.dropAll(databaseName);
Map<String, Object> removed = TriggerHandlerNewProcedures.dropAll(databaseName);
return removed.entrySet().stream().map(this::toTriggerInfo);
}

Expand All @@ -120,12 +130,9 @@ public Stream<TriggerInfo> dropAll(@Name("databaseName") String databaseName) {
@Description("CALL apoc.trigger.stop(databaseName, name) | it pauses the trigger")
public Stream<TriggerInfo> stop(@Name("databaseName") String databaseName, @Name("name")String name) {
checkInSystemWriter();
Map<String, Object> paused = TriggerHandlerWrite.updatePaused(databaseName, name, true);
Map<String, Object> paused = TriggerHandlerNewProcedures.updatePaused(databaseName, name, true);

return Stream.of(new TriggerInfo(name,
(String)paused.get("statement"),
(Map<String,Object>) paused.get("selector"),
(Map<String,Object>) paused.get("params"),true, true));
return Stream.of(TriggerInfo.from(paused,true));
}

// TODO - change with @SystemOnlyProcedure
Expand All @@ -134,12 +141,21 @@ public Stream<TriggerInfo> stop(@Name("databaseName") String databaseName, @Name
@Description("CALL apoc.trigger.start(databaseName, name) | it resumes the paused trigger")
public Stream<TriggerInfo> start(@Name("databaseName") String databaseName, @Name("name")String name) {
checkInSystemWriter();
Map<String, Object> resume = TriggerHandlerWrite.updatePaused(databaseName, name, false);
Map<String, Object> resume = TriggerHandlerNewProcedures.updatePaused(databaseName, name, false);

return Stream.of(new TriggerInfo(name,
(String)resume.get("statement"),
(Map<String,Object>) resume.get("selector"),
(Map<String,Object>) resume.get("params"),true, false));
return Stream.of(TriggerInfo.from(resume, true));
}

// TODO - change with @SystemOnlyProcedure
@SystemProcedure
@Procedure(mode = Mode.READ)
@Description("CALL apoc.trigger.show(databaseName) | it lists all eventually installed triggers for a database")
public Stream<TriggerInfo> show(@Name("databaseName") String databaseName) {
checkInSystemWriter();

return TriggerHandlerNewProcedures.getTriggerNodesList(databaseName, tx)
.stream()
.map(trigger -> TriggerInfo.from(trigger, true)
);
}
}
30 changes: 30 additions & 0 deletions core/src/test/java/apoc/trigger/TriggerNewProceduresTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static apoc.util.TestUtil.testCallCount;
import static apoc.util.TestUtil.testCallCountEventually;
import static apoc.util.TestUtil.testCallEventually;
import static apoc.util.TestUtil.testResult;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -503,6 +504,35 @@ public void testDeleteRelationships() {
//
// new test cases
//

@Test
public void testTriggerShow() {
String name = "test-show1";
String name2 = "test-show2";
String query = "MATCH (c:TestShow) SET c.count = 1";

testCall(sysDb, "CALL apoc.trigger.install('neo4j', $name, $query,{}) YIELD name",
map("query", query, "name", name),
r -> assertEquals(name, r.get("name")));

testCall(sysDb, "CALL apoc.trigger.install('neo4j', $name, $query,{}) YIELD name",
map("query", query, "name", name2),
r -> assertEquals(name2, r.get("name")));

// not updated
testResult(sysDb, "CALL apoc.trigger.show('neo4j') " +
"YIELD name, query RETURN * ORDER BY name",
map("query", query, "name", name),
res -> {
Map<String, Object> row = res.next();
assertEquals(name, row.get("name"));
assertEquals(query, row.get("query"));
row = res.next();
assertEquals(name2, row.get("name"));
assertEquals(query, row.get("query"));
assertFalse(res.hasNext());
});
}

@Test
public void testInstallTriggerInUserDb() {
Expand Down

0 comments on commit df9b393

Please sign in to comment.