diff --git a/.gitignore b/.gitignore index 21f944c7cd..67d1dd0eb6 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ bin progress.csv logs/ store_lock +.hprof \ No newline at end of file diff --git a/common/src/main/java/apoc/Pools.java b/common/src/main/java/apoc/Pools.java index 079556cb83..ba1c035fb2 100644 --- a/common/src/main/java/apoc/Pools.java +++ b/common/src/main/java/apoc/Pools.java @@ -1,6 +1,6 @@ package apoc; -import apoc.periodic.Periodic; +import apoc.periodic.PeriodicUtils; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Transaction; import org.neo4j.kernel.api.procedure.GlobalProcedures; @@ -40,7 +40,7 @@ public class Pools extends LifecycleAdapter { private ScheduledExecutorService scheduledExecutorService; private ExecutorService defaultExecutorService; - private final Map jobList = new ConcurrentHashMap<>(); + private final Map jobList = new ConcurrentHashMap<>(); public Pools(LogService log, GlobalProcedures globalProceduresRegistry, ApocConfig apocConfig) { @@ -78,8 +78,8 @@ public void init() { ); scheduledExecutorService.scheduleAtFixedRate(() -> { - for (Iterator> it = jobList.entrySet().iterator(); it.hasNext(); ) { - Map.Entry entry = it.next(); + for (Iterator> it = jobList.entrySet().iterator(); it.hasNext(); ) { + Map.Entry entry = it.next(); if (entry.getValue().isDone() || entry.getValue().isCancelled()) it.remove(); } },10,10,TimeUnit.SECONDS); @@ -109,7 +109,7 @@ public ExecutorService getDefaultExecutorService() { return defaultExecutorService; } - public Map getJobList() { + public Map getJobList() { return jobList; } diff --git a/common/src/main/java/apoc/periodic/PeriodicUtils.java b/common/src/main/java/apoc/periodic/PeriodicUtils.java index c3abfeb0f5..4c889138d2 100644 --- a/common/src/main/java/apoc/periodic/PeriodicUtils.java +++ b/common/src/main/java/apoc/periodic/PeriodicUtils.java @@ -2,6 +2,7 @@ import apoc.Pools; import apoc.util.Util; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.QueryStatistics; import org.neo4j.graphdb.Transaction; @@ -16,12 +17,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.ToLongFunction; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; +import static apoc.util.Util.merge; public class PeriodicUtils { @@ -29,6 +33,131 @@ private PeriodicUtils() { } + public static class JobInfo { + public final String name; + public long delay; + public long rate; + public boolean done; + public boolean cancelled; + + public JobInfo(String name) { + this.name = name; + } + + public JobInfo(String name, long delay, long rate) { + this.name = name; + this.delay = delay; + this.rate = rate; + } + + public JobInfo update(Future future) { + this.done = future.isDone(); + this.cancelled = future.isCancelled(); + return this; + } + + @Override + public boolean equals(Object o) { + return this == o || o instanceof JobInfo && name.equals(((JobInfo) o).name); + } + + @Override + public int hashCode() { + return name.hashCode(); + } + } + + + + static abstract class ExecuteBatch implements Function { + + protected TerminationGuard terminationGuard; + protected BatchAndTotalCollector collector; + protected List> batch; + protected BiFunction, QueryStatistics> consumer; + + ExecuteBatch(TerminationGuard terminationGuard, + BatchAndTotalCollector collector, + List> batch, + BiFunction, QueryStatistics> consumer) { + this.terminationGuard = terminationGuard; + this.collector = collector; + this.batch = batch; + this.consumer = consumer; + } + + public void release() { + terminationGuard = null; + collector = null; + batch = null; + consumer = null; + } + } + + static class ListExecuteBatch extends ExecuteBatch { + + ListExecuteBatch(TerminationGuard terminationGuard, + BatchAndTotalCollector collector, + List> batch, + BiFunction, QueryStatistics> consumer) { + super(terminationGuard, collector, batch, consumer); + } + + @Override + public final Long apply(Transaction txInThread) { + if (Util.transactionIsTerminated(terminationGuard)) return 0L; + Map params = Util.map("_count", collector.getCount(), "_batch", batch); + return executeAndReportErrors(txInThread, consumer, params, batch, batch.size(), null, collector); + } + } + + static class OneByOneExecuteBatch extends ExecuteBatch { + + OneByOneExecuteBatch(TerminationGuard terminationGuard, + BatchAndTotalCollector collector, + List> batch, + BiFunction, QueryStatistics> consumer) { + super(terminationGuard, collector, batch, consumer); + } + + @Override + public final Long apply(Transaction txInThread) { + if (Util.transactionIsTerminated(terminationGuard)) return 0L; + AtomicLong localCount = new AtomicLong(collector.getCount()); + return batch.stream().mapToLong( + p -> { + if (localCount.get() % 1000 == 0 && Util.transactionIsTerminated(terminationGuard)) { + return 0; + } + Map params = merge(p, Util.map("_count", localCount.get(), "_batch", batch)); + return executeAndReportErrors(txInThread, consumer, params, batch, 1, localCount, collector); + }).sum(); + } + } + + private static long executeAndReportErrors(Transaction tx, BiFunction, QueryStatistics> consumer, Map params, + List> batch, int returnValue, AtomicLong localCount, BatchAndTotalCollector collector) { + try { + QueryStatistics statistics = consumer.apply(tx, params); + if (localCount!=null) { + localCount.getAndIncrement(); + } + collector.updateStatistics(statistics); + return returnValue; + } catch (Exception e) { + collector.incrementFailedOps(batch.size()); + collector.amendFailedParamsMap(batch); + recordError(collector.getOperationErrors(), e); + throw e; + } + } + + public static void recordError(Map executionErrors, Exception e) { + String msg = ExceptionUtils.getRootCause(e).getMessage(); + // String msg = ExceptionUtils.getThrowableList(e).stream().map(Throwable::getMessage).collect(Collectors.joining(",")) + executionErrors.compute(msg, (s, i) -> i == null ? 1 : i + 1); + } + public static Pair prepareInnerStatement(String cypherAction, BatchMode batchMode, List columns, String iteratorVariableName) { String names = columns.stream().map(Util::quote).collect(Collectors.joining("|")); boolean withCheck = regNoCaseMultiLine("[{$](" + names + ")\\}?\\s+AS\\s+").matcher(cypherAction).find(); @@ -75,10 +204,10 @@ public static Stream iterateAndExecuteBatchedInSeparateThre if (log.isDebugEnabled()) log.debug("Execute, in periodic iteration with id %s, no %d batch size ", periodicId, batchsize); List> batch = Util.take(iterator, batchsize); final long currentBatchSize = batch.size(); - Periodic.ExecuteBatch executeBatch = + ExecuteBatch executeBatch = iterateList ? - new Periodic.ListExecuteBatch(terminationGuard, collector, batch, consumer) : - new Periodic.OneByOneExecuteBatch(terminationGuard, collector, batch, consumer); + new ListExecuteBatch(terminationGuard, collector, batch, consumer) : + new OneByOneExecuteBatch(terminationGuard, collector, batch, consumer); futures.add(Util.inTxFuture(log, pool, diff --git a/common/src/main/java/apoc/periodic/Periodic.java b/core/src/main/java/apoc/periodic/Periodic.java similarity index 75% rename from common/src/main/java/apoc/periodic/Periodic.java rename to core/src/main/java/apoc/periodic/Periodic.java index af816c2ba7..7ebe9b0d93 100644 --- a/common/src/main/java/apoc/periodic/Periodic.java +++ b/core/src/main/java/apoc/periodic/Periodic.java @@ -2,9 +2,8 @@ import apoc.Pools; import apoc.util.Util; -import org.apache.commons.lang3.exception.ExceptionUtils; +import apoc.periodic.PeriodicUtils.JobInfo; import org.neo4j.graphdb.GraphDatabaseService; -import org.neo4j.graphdb.QueryStatistics; import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.schema.ConstraintDefinition; @@ -19,13 +18,11 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiFunction; -import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; +import static apoc.periodic.PeriodicUtils.recordError; import static apoc.util.Util.merge; public class Periodic { @@ -112,12 +109,6 @@ public Stream commit(@Name("statement") String statement, @Name(v return Stream.of(new RundownResult(total,executions, timeTaken, batches.get(),failedBatches.get(),batchErrors, failedCommits.get(), commitErrors, wasTerminated)); } - private static void recordError(Map executionErrors, Exception e) { - String msg = ExceptionUtils.getRootCause(e).getMessage(); - // String msg = ExceptionUtils.getThrowableList(e).stream().map(Throwable::getMessage).collect(Collectors.joining(",")) - executionErrors.compute(msg, (s, i) -> i == null ? 1 : i + 1); - } - public static class RundownResult { public final long updates; public final long executions; @@ -318,124 +309,6 @@ private static String prependQueryOption(String query, String cypherOption) { : completePrefix + query; } - - static abstract class ExecuteBatch implements Function { - - protected TerminationGuard terminationGuard; - protected BatchAndTotalCollector collector; - protected List> batch; - protected BiFunction, QueryStatistics> consumer; - - ExecuteBatch(TerminationGuard terminationGuard, - BatchAndTotalCollector collector, - List> batch, - BiFunction, QueryStatistics> consumer) { - this.terminationGuard = terminationGuard; - this.collector = collector; - this.batch = batch; - this.consumer = consumer; - } - - public void release() { - terminationGuard = null; - collector = null; - batch = null; - consumer = null; - } - } - - static class ListExecuteBatch extends ExecuteBatch { - - ListExecuteBatch(TerminationGuard terminationGuard, - BatchAndTotalCollector collector, - List> batch, - BiFunction, QueryStatistics> consumer) { - super(terminationGuard, collector, batch, consumer); - } - - @Override - public final Long apply(Transaction txInThread) { - if (Util.transactionIsTerminated(terminationGuard)) return 0L; - Map params = Util.map("_count", collector.getCount(), "_batch", batch); - return executeAndReportErrors(txInThread, consumer, params, batch, batch.size(), null, collector); - } - } - - static class OneByOneExecuteBatch extends ExecuteBatch { - - OneByOneExecuteBatch(TerminationGuard terminationGuard, - BatchAndTotalCollector collector, - List> batch, - BiFunction, QueryStatistics> consumer) { - super(terminationGuard, collector, batch, consumer); - } - - @Override - public final Long apply(Transaction txInThread) { - if (Util.transactionIsTerminated(terminationGuard)) return 0L; - AtomicLong localCount = new AtomicLong(collector.getCount()); - return batch.stream().mapToLong( - p -> { - if (localCount.get() % 1000 == 0 && Util.transactionIsTerminated(terminationGuard)) { - return 0; - } - Map params = merge(p, Util.map("_count", localCount.get(), "_batch", batch)); - return executeAndReportErrors(txInThread, consumer, params, batch, 1, localCount, collector); - }).sum(); - } - } - - private static long executeAndReportErrors(Transaction tx, BiFunction, QueryStatistics> consumer, Map params, - List> batch, int returnValue, AtomicLong localCount, BatchAndTotalCollector collector) { - try { - QueryStatistics statistics = consumer.apply(tx, params); - if (localCount!=null) { - localCount.getAndIncrement(); - } - collector.updateStatistics(statistics); - return returnValue; - } catch (Exception e) { - collector.incrementFailedOps(batch.size()); - collector.amendFailedParamsMap(batch); - recordError(collector.getOperationErrors(), e); - throw e; - } - } - - public static class JobInfo { - public final String name; - public long delay; - public long rate; - public boolean done; - public boolean cancelled; - - public JobInfo(String name) { - this.name = name; - } - - public JobInfo(String name, long delay, long rate) { - this.name = name; - this.delay = delay; - this.rate = rate; - } - - public JobInfo update(Future future) { - this.done = future.isDone(); - this.cancelled = future.isCancelled(); - return this; - } - - @Override - public boolean equals(Object o) { - return this == o || o instanceof JobInfo && name.equals(((JobInfo) o).name); - } - - @Override - public int hashCode() { - return name.hashCode(); - } - } - private class Countdown implements Runnable { private final String name; private final String statement; diff --git a/common/src/main/java/apoc/refactor/GraphRefactoring.java b/core/src/main/java/apoc/refactor/GraphRefactoring.java similarity index 100% rename from common/src/main/java/apoc/refactor/GraphRefactoring.java rename to core/src/main/java/apoc/refactor/GraphRefactoring.java diff --git a/common/src/main/java/apoc/refactor/rename/Rename.java b/core/src/main/java/apoc/refactor/rename/Rename.java similarity index 100% rename from common/src/main/java/apoc/refactor/rename/Rename.java rename to core/src/main/java/apoc/refactor/rename/Rename.java diff --git a/common/src/test/java/apoc/periodic/BatchModeTest.java b/core/src/test/java/apoc/periodic/BatchModeTest.java similarity index 100% rename from common/src/test/java/apoc/periodic/BatchModeTest.java rename to core/src/test/java/apoc/periodic/BatchModeTest.java diff --git a/common/src/test/java/apoc/periodic/PeriodicTest.java b/core/src/test/java/apoc/periodic/PeriodicTest.java similarity index 100% rename from common/src/test/java/apoc/periodic/PeriodicTest.java rename to core/src/test/java/apoc/periodic/PeriodicTest.java diff --git a/common/src/test/java/apoc/periodic/PeriodicUtilsTest.java b/core/src/test/java/apoc/periodic/PeriodicUtilsTest.java similarity index 100% rename from common/src/test/java/apoc/periodic/PeriodicUtilsTest.java rename to core/src/test/java/apoc/periodic/PeriodicUtilsTest.java diff --git a/common/src/test/java/apoc/refactor/GraphRefactoringTest.java b/core/src/test/java/apoc/refactor/GraphRefactoringTest.java similarity index 100% rename from common/src/test/java/apoc/refactor/GraphRefactoringTest.java rename to core/src/test/java/apoc/refactor/GraphRefactoringTest.java diff --git a/common/src/test/java/apoc/refactor/rename/RenameTest.java b/core/src/test/java/apoc/refactor/rename/RenameTest.java similarity index 100% rename from common/src/test/java/apoc/refactor/rename/RenameTest.java rename to core/src/test/java/apoc/refactor/rename/RenameTest.java diff --git a/common/src/test/java/apoc/refactor/util/PropertiesManagerTest.java b/core/src/test/java/apoc/refactor/util/PropertiesManagerTest.java similarity index 100% rename from common/src/test/java/apoc/refactor/util/PropertiesManagerTest.java rename to core/src/test/java/apoc/refactor/util/PropertiesManagerTest.java diff --git a/full/build.gradle b/full/build.gradle index 5aecc8a7b5..ed6634a4bd 100644 --- a/full/build.gradle +++ b/full/build.gradle @@ -46,8 +46,10 @@ shadowJar { dependencies { implementation project(":common") - apt project(":processor") + testImplementation project(":common").sourceSets.test.output testImplementation project(':test-utils') + testImplementation project(":core") + apt project(":processor") compileOnly "org.jetbrains.kotlin:kotlin-stdlib" testImplementation "org.jetbrains.kotlin:kotlin-stdlib" diff --git a/full/src/test/java/apoc/ApocSplitTest.java b/full/src/test/java/apoc/ApocSplitTest.java index 71b8f14f61..a5c450478b 100644 --- a/full/src/test/java/apoc/ApocSplitTest.java +++ b/full/src/test/java/apoc/ApocSplitTest.java @@ -17,14 +17,6 @@ */ public class ApocSplitTest { public static final List PROCEDURES_FROM_CORE = List.of( - "apoc.periodic.truncate", - "apoc.periodic.list", - "apoc.periodic.commit", - "apoc.periodic.cancel", - "apoc.periodic.submit", - "apoc.periodic.repeat", - "apoc.periodic.countdown", - "apoc.periodic.iterate", "apoc.trigger.add", "apoc.trigger.remove", "apoc.trigger.removeAll", @@ -171,24 +163,6 @@ public class ApocSplitTest { "apoc.warmup.run", "apoc.stats.degrees", "apoc.help", - "apoc.refactor.rename.label", - "apoc.refactor.rename.type", - "apoc.refactor.rename.nodeProperty", - "apoc.refactor.rename.typeProperty", - "apoc.refactor.extractNode", - "apoc.refactor.collapseNode", - "apoc.refactor.cloneNodes", - "apoc.refactor.cloneSubgraphFromPaths", - "apoc.refactor.cloneSubgraph", - "apoc.refactor.mergeNodes", - "apoc.refactor.mergeRelationships", - "apoc.refactor.setType", - "apoc.refactor.to", - "apoc.refactor.invert", - "apoc.refactor.from", - "apoc.refactor.normalizeAsBoolean", - "apoc.refactor.categorize", - "apoc.refactor.deleteAndReconnect", "apoc.convert.setJsonProperty", "apoc.convert.toTree", "apoc.neighbors.tohop",