Skip to content

Commit

Permalink
[gULAAkri] apoc.periodic.repeat doesn't always work (#388)
Browse files Browse the repository at this point in the history
* [gULAAkri] apoc.periodic.repeat doesn't always work

* [gULAAkri] added mockLog
  • Loading branch information
vga91 authored May 16, 2023
1 parent e8b1935 commit c226380
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 4 deletions.
4 changes: 3 additions & 1 deletion common/src/main/java/apoc/periodic/PeriodicUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.logging.Log;
import org.neo4j.procedure.TerminationGuard;
Expand Down Expand Up @@ -269,7 +270,8 @@ public static Stream<JobInfo> submitProc(String name, String statement, Map<Stri
Map<String,Object> params = (Map) config.getOrDefault("params", Collections.emptyMap());
JobInfo info = submitJob(name, () -> {
try {
db.executeTransactionally(statement, params);
// `resultAsString` in order to consume result
db.executeTransactionally(statement, params, Result::resultAsString);
} catch(Exception e) {
log.warn("in background task via submit", e);
throw new RuntimeException(e);
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public Stream<JobInfo> repeat(@Name("name") String name, @Name("statement") Stri
validateQuery(statement);
Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());
JobInfo info = schedule(name, () -> {
db.executeTransactionally(statement, params);
// `resultAsString` in order to consume result
db.executeTransactionally(statement, params, Result::resultAsString);
},0,rate);
return Stream.of(info);
}
Expand Down
61 changes: 59 additions & 2 deletions core/src/test/java/apoc/periodic/PeriodicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
import org.neo4j.kernel.api.KernelTransactionHandle;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.test.rule.DbmsRule;
import org.neo4j.test.rule.ImpermanentDbmsRule;

Expand All @@ -48,6 +53,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import java.util.stream.Stream;

Expand All @@ -69,21 +75,72 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.driver.internal.util.Iterables.count;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class PeriodicTest {
public static class MockLogger {
@Context public Log log;

@Procedure("apoc.mockLog")
public void mockLog(@Name("value") String value) {
log.info(value);
}
}

public static final long RUNDOWN_COUNT = 1000;
public static final int BATCH_SIZE = 399;

public static AssertableLogProvider logProvider = new AssertableLogProvider();

@Rule
public DbmsRule db = new ImpermanentDbmsRule();
public DbmsRule db = new ImpermanentDbmsRule(logProvider);

@Before
public void initDb() {
TestUtil.registerProcedure(db, Periodic.class, Schemas.class, Cypher.class, Utils.class);
TestUtil.registerProcedure(db, Periodic.class, Schemas.class, Cypher.class, Utils.class, MockLogger.class);
db.executeTransactionally("call apoc.periodic.list() yield name call apoc.periodic.cancel(name) yield name as name2 return count(*)");
}

@Test
public void testRepeatWithVoidProcedure() {
String logVal = "repeatVoid";
String query = "CALL apoc.periodic.repeat('repeat-1', 'CALL apoc.mockLog($logVal)', 1, {params: {logVal: $logVal}})";
testLogIncrease(query, logVal);
}

@Test
public void testRepeatWithVoidProcedureAndReturn() {
String logVal = "repeatVoidWithReturn";
String query = "CALL apoc.periodic.repeat('repeat-2', 'CALL apoc.mockLog($logVal) RETURN 1', 1, {params: {logVal: $logVal}})";
testLogIncrease(query, logVal);
}

@Test
public void testSubmitWithVoidProcedure() {
String logVal = "submitVoid";
String query = "CALL apoc.periodic.submit('submit-1', 'CALL apoc.mockLog($logVal) RETURN 1', {params: {logVal: $logVal}})";
testLogIncrease(query, logVal);
}

@Test
public void testSubmitWithVoidProcedureAndReturn() {
String logVal = "submitVoidWithReturn";
String query = "CALL apoc.periodic.submit('submit-1', 'CALL apoc.mockLog($logVal)', {params: {logVal: $logVal}})";
testLogIncrease(query, logVal);
}

private void testLogIncrease(String query, String logVal) {
// execute a periodic procedure with `CALL apoc.mockLog(...)` as an inner procedure
db.executeTransactionally(query, Map.of("logVal", logVal));

// check custom log in logProvider
assertEventually(() -> {
String serialize = logProvider.serialize();
return serialize.contains(logVal);
}, (val) -> val, 5L, TimeUnit.SECONDS);

}

@Test
public void testSubmitStatement() throws Exception {
String callList = "CALL apoc.periodic.list()";
Expand Down

0 comments on commit c226380

Please sign in to comment.