Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hxmGFWvF] Only create a TriggerHandler if triggers are enabled. #637

Merged
merged 4 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions common/src/main/java/apoc/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -127,6 +128,7 @@
* @since 24.04.16
*/
public class Util {

public static final Label[] NO_LABELS = new Label[0];
public static final String NODE_COUNT = "MATCH (n) RETURN count(*) as result";
public static final String REL_COUNT = "MATCH ()-->() RETURN count(*) as result";
Expand Down Expand Up @@ -1299,4 +1301,33 @@ public static Iterable<IndexDefinition> getIndexes(Transaction transaction, Rela
.filter(indexDefinition -> indexDefinition.getIndexType() != IndexType.VECTOR)
.toList();
}

public static <T> T withBackOffRetries(Supplier<T> func, long initialTimeout, long upperTimeout, Log log) {
T result = null;
var startTime = System.currentTimeMillis();
var timeout = initialTimeout;
var lastTry = startTime - timeout;

while (true) {
var timeStamp = System.currentTimeMillis();
if (timeStamp - lastTry >= timeout) {
try {
result = func.get();
break;
} catch (Exception e) {
if (timeout >= upperTimeout) {
gem-neo4j marked this conversation as resolved.
Show resolved Hide resolved
Long totalTime = (System.currentTimeMillis() - startTime) / 1000;
if (log.isDebugEnabled()) {
log.debug(String.format(
"Got %s: %s after %s seconds.", e.getClass(), e.getMessage(), totalTime));
}
throw e;
}
}
lastTry = timeStamp;
timeout *= 2;
}
}
return result;
}
}
25 changes: 16 additions & 9 deletions core/src/main/java/apoc/CoreApocGlobalComponents.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package apoc;

import static apoc.ApocConfig.APOC_TRIGGER_ENABLED;

import apoc.cypher.CypherInitializer;
import apoc.trigger.TriggerHandler;
import java.util.Collection;
Expand All @@ -34,15 +36,20 @@ public class CoreApocGlobalComponents implements ApocGlobalComponents {

@Override
public Map<String, Lifecycle> getServices(GraphDatabaseAPI db, ApocExtensionFactory.Dependencies dependencies) {
return Collections.singletonMap(
"trigger",
new TriggerHandler(
db,
dependencies.databaseManagementService(),
dependencies.apocConfig(),
dependencies.log().getUserLog(TriggerHandler.class),
dependencies.pools(),
dependencies.scheduler()));
var apocConfig = dependencies.apocConfig();

if (apocConfig.getConfig().getBoolean(APOC_TRIGGER_ENABLED)) {
return Collections.singletonMap(
"trigger",
new TriggerHandler(
db,
dependencies.databaseManagementService(),
apocConfig,
dependencies.log().getUserLog(TriggerHandler.class),
dependencies.pools(),
dependencies.scheduler()));
}
return Collections.emptyMap();
}

@Override
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/apoc/trigger/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package apoc.trigger;

import static apoc.ApocConfig.APOC_TRIGGER_ENABLED;
import static apoc.ApocConfig.apocConfig;

import apoc.ApocConfig;
import apoc.util.Util;
import java.util.Collections;
import java.util.Map;
Expand All @@ -37,6 +39,10 @@
* @since 20.09.16
*/
public class Trigger {

public static final String NOT_ENABLED_ERROR = "Triggers have not been enabled."
+ " Set 'apoc.trigger.enabled=true' in your apoc.conf file located in the $NEO4J_HOME/conf/ directory.";

public static final String SYS_DB_NON_WRITER_ERROR =
"""
This instance is not allowed to write to the system database.
Expand Down Expand Up @@ -71,6 +77,7 @@ public Stream<TriggerInfo> add(
@Name("statement") String statement,
@Name(value = "selector") Map<String, Object> selector,
@Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
checkEnabled(apocConfig());
preprocessDeprecatedProcedures();

Util.validateQuery(db, statement);
Expand All @@ -95,6 +102,7 @@ public Stream<TriggerInfo> add(
@Procedure(name = "apoc.trigger.remove", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.drop")
@Description("Removes the given trigger.")
public Stream<TriggerInfo> remove(@Name("name") String name) {
checkEnabled(apocConfig());
preprocessDeprecatedProcedures();

Map<String, Object> removed = triggerHandler.remove(name);
Expand All @@ -115,6 +123,7 @@ public Stream<TriggerInfo> remove(@Name("name") String name) {
@Procedure(name = "apoc.trigger.removeAll", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.dropAll")
@Description("Removes all previously added triggers.")
public Stream<TriggerInfo> removeAll() {
checkEnabled(apocConfig());
preprocessDeprecatedProcedures();

final var removed = triggerHandler.removeAll();
Expand All @@ -128,6 +137,7 @@ public Stream<TriggerInfo> removeAll() {
@Procedure(name = "apoc.trigger.list", mode = Mode.READ)
@Description("Lists all currently installed triggers for the session database.")
public Stream<TriggerInfo> list() {
checkEnabled(apocConfig());
return triggerHandler.list().entrySet().stream()
.map((e) -> new TriggerInfo(
e.getKey(),
Expand All @@ -143,6 +153,7 @@ public Stream<TriggerInfo> list() {
@Procedure(name = "apoc.trigger.pause", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.stop")
@Description("Pauses the given trigger.")
public Stream<TriggerInfo> pause(@Name("name") String name) {
checkEnabled(apocConfig());
preprocessDeprecatedProcedures();

Map<String, Object> paused = triggerHandler.updatePaused(name, true);
Expand All @@ -161,6 +172,7 @@ public Stream<TriggerInfo> pause(@Name("name") String name) {
@Procedure(name = "apoc.trigger.resume", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.start")
@Description("Resumes the given paused trigger.")
public Stream<TriggerInfo> resume(@Name("name") String name) {
checkEnabled(apocConfig());
preprocessDeprecatedProcedures();

Map<String, Object> resume = triggerHandler.updatePaused(name, false);
Expand All @@ -173,4 +185,10 @@ public Stream<TriggerInfo> resume(@Name("name") String name) {
true,
false));
}

private void checkEnabled(ApocConfig apocConfig) {
if (!apocConfig.getConfig().getBoolean(APOC_TRIGGER_ENABLED)) {
throw new RuntimeException(NOT_ENABLED_ERROR);
}
}
}
39 changes: 15 additions & 24 deletions core/src/main/java/apoc/trigger/TriggerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package apoc.trigger;

import static apoc.ApocConfig.APOC_TRIGGER_ENABLED;
import static apoc.ApocConfig.apocConfig;
import static apoc.SystemLabels.ApocTrigger;

Expand Down Expand Up @@ -79,9 +78,6 @@ private enum Phase {

private final AtomicBoolean registeredWithKernel = new AtomicBoolean(false);

public static final String NOT_ENABLED_ERROR = "Triggers have not been enabled."
+ " Set 'apoc.trigger.enabled=true' in your apoc.conf file located in the $NEO4J_HOME/conf/ directory.";

public TriggerHandler(
GraphDatabaseService db,
DatabaseManagementService databaseManagementService,
Expand All @@ -97,16 +93,6 @@ public TriggerHandler(
this.jobScheduler = jobScheduler;
}

private boolean isEnabled() {
return apocConfig.getBoolean(APOC_TRIGGER_ENABLED);
}

public void checkEnabled() {
if (!isEnabled()) {
throw new RuntimeException(NOT_ENABLED_ERROR);
}
}

public void updateCache() {
try {
doUpdateCache();
Expand Down Expand Up @@ -174,7 +160,6 @@ private void reconcileKernelRegistration() {

public Map<String, Object> add(
String name, String statement, Map<String, Object> selector, Map<String, Object> params) {
checkEnabled();
final var previous = triggersSnapshot.get().get(name);

withSystemDb(tx -> {
Expand All @@ -197,7 +182,6 @@ public Map<String, Object> add(
}

public Map<String, Object> remove(String name) {
checkEnabled();
final var previous = triggersSnapshot.get().get(name);

withSystemDb(tx -> {
Expand All @@ -216,7 +200,6 @@ public Map<String, Object> remove(String name) {
}

public Map<String, Object> updatePaused(String name, boolean paused) {
checkEnabled();
withSystemDb(tx -> {
tx.findNodes(
ApocTrigger,
Expand All @@ -233,7 +216,6 @@ public Map<String, Object> updatePaused(String name, boolean paused) {
}

public Map<String, Map<String, Object>> removeAll() {
checkEnabled();
final var previous = triggersSnapshot.get();
withSystemDb(tx -> {
tx.findNodes(ApocTrigger, SystemPropertyKeys.database.name(), db.databaseName())
Expand All @@ -246,7 +228,6 @@ public Map<String, Map<String, Object>> removeAll() {
}

public Map<String, Map<String, Object>> list() {
checkEnabled();
return triggersSnapshot.get();
}

Expand Down Expand Up @@ -373,11 +354,21 @@ public void stop() {
}

private <T> T withSystemDb(Function<Transaction, T> action) {
try (Transaction tx = apocConfig.getSystemDb().beginTx()) {
T result = action.apply(tx);
tx.commit();
return result;
}
var timeout = 500;

// When the timeout reaches 12 hours, we will have been trying for 24 hours - time to give up
var upperTimeout = 43200000;

return Util.withBackOffRetries(
() -> {
Transaction tx = apocConfig.getSystemDb().beginTx();
T result = action.apply(tx);
tx.commit();
return result;
},
timeout,
upperTimeout,
log);
}

private long getLastUpdate() {
Expand Down
41 changes: 31 additions & 10 deletions core/src/test/java/apoc/trigger/TriggerDisabledTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static apoc.ApocConfig.APOC_TRIGGER_ENABLED;
import static apoc.ApocConfig.apocConfig;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import apoc.util.TestUtil;
Expand Down Expand Up @@ -61,43 +62,63 @@ public static void teardown() {

@Test
public void testTriggerDisabledList() {
assertThrows(
TriggerHandler.NOT_ENABLED_ERROR,
Exception e = assertThrows(
"Expected a runtime error when running apoc.trigger.list with triggers disabled.",
RuntimeException.class,
() -> db.executeTransactionally(
"CALL apoc.trigger.list() YIELD name RETURN name", Map.of(), Result::resultAsString));
assertEquals(expectedDisabledError("apoc.trigger.list"), e.getMessage());
}

@Test
public void testTriggerDisabledAdd() {
assertThrows(
TriggerHandler.NOT_ENABLED_ERROR,
Exception e = assertThrows(
"Expected a runtime error when running apoc.trigger.add with triggers disabled.",
RuntimeException.class,
() -> db.executeTransactionally(
"CALL apoc.trigger.add('test-trigger', 'RETURN 1', {phase: 'before'}) YIELD name RETURN name"));
assertEquals(expectedDisabledError("apoc.trigger.add"), e.getMessage());
}

@Test
public void testTriggerDisabledRemove() {
assertThrows(
TriggerHandler.NOT_ENABLED_ERROR,
Exception e = assertThrows(
"Expected a runtime error when running apoc.trigger.remove with triggers disabled.",
RuntimeException.class,
() -> db.executeTransactionally("CALL apoc.trigger.remove('test-trigger')"));
assertEquals(expectedDisabledError("apoc.trigger.remove"), e.getMessage());
}

@Test
public void testTriggerDisabledRemoveAll() {
Exception e = assertThrows(
"Expected a runtime error when running apoc.trigger.removeAll with triggers disabled.",
RuntimeException.class,
() -> db.executeTransactionally("CALL apoc.trigger.removeAll()"));
assertEquals(expectedDisabledError("apoc.trigger.removeAll"), e.getMessage());
}

@Test
public void testTriggerDisabledResume() {
assertThrows(
TriggerHandler.NOT_ENABLED_ERROR,
Exception e = assertThrows(
"Expected a runtime error when running apoc.trigger.resume with triggers disabled.",
RuntimeException.class,
() -> db.executeTransactionally("CALL apoc.trigger.resume('test-trigger')"));
assertEquals(expectedDisabledError("apoc.trigger.resume"), e.getMessage());
}

@Test
public void testTriggerDisabledPause() {
assertThrows(
TriggerHandler.NOT_ENABLED_ERROR,
Exception e = assertThrows(
"Expected a runtime error when running apoc.trigger.pause with triggers disabled.",
RuntimeException.class,
() -> db.executeTransactionally("CALL apoc.trigger.pause('test-trigger')"));
assertEquals(expectedDisabledError("apoc.trigger.pause"), e.getMessage());
}

private String expectedDisabledError(String procedureName) {
return String.format(
"Failed to invoke procedure `%s`: Caused by: java.lang.RuntimeException: %s",
procedureName, Trigger.NOT_ENABLED_ERROR);
}
}
8 changes: 6 additions & 2 deletions core/src/test/java/apoc/trigger/TriggerRestartTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package apoc.trigger;

import static apoc.ApocConfig.APOC_TRIGGER_ENABLED;
import static apoc.trigger.TriggerTestUtil.TRIGGER_DEFAULT_REFRESH;
import static apoc.trigger.TriggerTestUtil.awaitTriggerDiscovered;
import static apoc.util.TestUtil.waitDbsAvailable;
import static org.junit.Assert.assertEquals;

import apoc.ApocConfig;
import apoc.util.TestUtil;
import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -54,14 +54,18 @@ public class TriggerRestartTest {
public static final ProvideSystemProperty systemPropertyRule =
new ProvideSystemProperty("apoc.trigger.refresh", String.valueOf(TRIGGER_DEFAULT_REFRESH));

// we cannot set via apocConfig().setProperty(apoc.trigger.enabled, ...) in `@Before`, because is too late
@ClassRule
public static final ProvideSystemProperty systemPropertyRule2 =
new ProvideSystemProperty(APOC_TRIGGER_ENABLED, String.valueOf(true));

@Before
public void setUp() throws IOException {
databaseManagementService =
new TestDatabaseManagementServiceBuilder(store_dir.getRoot().toPath()).build();
db = databaseManagementService.database(GraphDatabaseSettings.DEFAULT_DATABASE_NAME);
sysDb = databaseManagementService.database(GraphDatabaseSettings.SYSTEM_DATABASE_NAME);
waitDbsAvailable(db, sysDb);
ApocConfig.apocConfig().setProperty("apoc.trigger.enabled", "true");
TestUtil.registerProcedure(db, TriggerNewProcedures.class, Trigger.class);
}

Expand Down
Loading
Loading