Skip to content

Commit

Permalink
NO AUTO Strips refactor and periodic from full
Browse files Browse the repository at this point in the history
  • Loading branch information
ncordon committed Aug 2, 2022
1 parent 5a99079 commit b50f00b
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 164 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ bin
progress.csv
logs/
store_lock
.hprof
10 changes: 5 additions & 5 deletions common/src/main/java/apoc/Pools.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package apoc;

import apoc.periodic.Periodic;
import apoc.periodic.PeriodicUtils;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.api.procedure.GlobalProcedures;
Expand Down Expand Up @@ -40,7 +40,7 @@ public class Pools extends LifecycleAdapter {
private ScheduledExecutorService scheduledExecutorService;
private ExecutorService defaultExecutorService;

private final Map<Periodic.JobInfo,Future> jobList = new ConcurrentHashMap<>();
private final Map<PeriodicUtils.JobInfo,Future> jobList = new ConcurrentHashMap<>();

public Pools(LogService log, GlobalProcedures globalProceduresRegistry, ApocConfig apocConfig) {

Expand Down Expand Up @@ -78,8 +78,8 @@ public void init() {
);

scheduledExecutorService.scheduleAtFixedRate(() -> {
for (Iterator<Map.Entry<Periodic.JobInfo, Future>> it = jobList.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<Periodic.JobInfo, Future> entry = it.next();
for (Iterator<Map.Entry<PeriodicUtils.JobInfo, Future>> it = jobList.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<PeriodicUtils.JobInfo, Future> entry = it.next();
if (entry.getValue().isDone() || entry.getValue().isCancelled()) it.remove();
}
},10,10,TimeUnit.SECONDS);
Expand Down Expand Up @@ -109,7 +109,7 @@ public ExecutorService getDefaultExecutorService() {
return defaultExecutorService;
}

public Map<Periodic.JobInfo, Future> getJobList() {
public Map<PeriodicUtils.JobInfo, Future> getJobList() {
return jobList;
}

Expand Down
135 changes: 132 additions & 3 deletions common/src/main/java/apoc/periodic/PeriodicUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import apoc.Pools;
import apoc.util.Util;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Transaction;
Expand All @@ -16,19 +17,147 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static apoc.util.Util.merge;

public class PeriodicUtils {

private PeriodicUtils() {

}

public static class JobInfo {
public final String name;
public long delay;
public long rate;
public boolean done;
public boolean cancelled;

public JobInfo(String name) {
this.name = name;
}

public JobInfo(String name, long delay, long rate) {
this.name = name;
this.delay = delay;
this.rate = rate;
}

public JobInfo update(Future future) {
this.done = future.isDone();
this.cancelled = future.isCancelled();
return this;
}

@Override
public boolean equals(Object o) {
return this == o || o instanceof JobInfo && name.equals(((JobInfo) o).name);
}

@Override
public int hashCode() {
return name.hashCode();
}
}



static abstract class ExecuteBatch implements Function<Transaction, Long> {

protected TerminationGuard terminationGuard;
protected BatchAndTotalCollector collector;
protected List<Map<String,Object>> batch;
protected BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer;

ExecuteBatch(TerminationGuard terminationGuard,
BatchAndTotalCollector collector,
List<Map<String, Object>> batch,
BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
this.terminationGuard = terminationGuard;
this.collector = collector;
this.batch = batch;
this.consumer = consumer;
}

public void release() {
terminationGuard = null;
collector = null;
batch = null;
consumer = null;
}
}

static class ListExecuteBatch extends ExecuteBatch {

ListExecuteBatch(TerminationGuard terminationGuard,
BatchAndTotalCollector collector,
List<Map<String, Object>> batch,
BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
super(terminationGuard, collector, batch, consumer);
}

@Override
public final Long apply(Transaction txInThread) {
if (Util.transactionIsTerminated(terminationGuard)) return 0L;
Map<String, Object> params = Util.map("_count", collector.getCount(), "_batch", batch);
return executeAndReportErrors(txInThread, consumer, params, batch, batch.size(), null, collector);
}
}

static class OneByOneExecuteBatch extends ExecuteBatch {

OneByOneExecuteBatch(TerminationGuard terminationGuard,
BatchAndTotalCollector collector,
List<Map<String, Object>> batch,
BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
super(terminationGuard, collector, batch, consumer);
}

@Override
public final Long apply(Transaction txInThread) {
if (Util.transactionIsTerminated(terminationGuard)) return 0L;
AtomicLong localCount = new AtomicLong(collector.getCount());
return batch.stream().mapToLong(
p -> {
if (localCount.get() % 1000 == 0 && Util.transactionIsTerminated(terminationGuard)) {
return 0;
}
Map<String, Object> params = merge(p, Util.map("_count", localCount.get(), "_batch", batch));
return executeAndReportErrors(txInThread, consumer, params, batch, 1, localCount, collector);
}).sum();
}
}

private static long executeAndReportErrors(Transaction tx, BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer, Map<String, Object> params,
List<Map<String, Object>> batch, int returnValue, AtomicLong localCount, BatchAndTotalCollector collector) {
try {
QueryStatistics statistics = consumer.apply(tx, params);
if (localCount!=null) {
localCount.getAndIncrement();
}
collector.updateStatistics(statistics);
return returnValue;
} catch (Exception e) {
collector.incrementFailedOps(batch.size());
collector.amendFailedParamsMap(batch);
recordError(collector.getOperationErrors(), e);
throw e;
}
}

public static void recordError(Map<String, Long> executionErrors, Exception e) {
String msg = ExceptionUtils.getRootCause(e).getMessage();
// String msg = ExceptionUtils.getThrowableList(e).stream().map(Throwable::getMessage).collect(Collectors.joining(","))
executionErrors.compute(msg, (s, i) -> i == null ? 1 : i + 1);
}

public static Pair<String,Boolean> prepareInnerStatement(String cypherAction, BatchMode batchMode, List<String> columns, String iteratorVariableName) {
String names = columns.stream().map(Util::quote).collect(Collectors.joining("|"));
boolean withCheck = regNoCaseMultiLine("[{$](" + names + ")\\}?\\s+AS\\s+").matcher(cypherAction).find();
Expand Down Expand Up @@ -75,10 +204,10 @@ public static Stream<BatchAndTotalResult> iterateAndExecuteBatchedInSeparateThre
if (log.isDebugEnabled()) log.debug("Execute, in periodic iteration with id %s, no %d batch size ", periodicId, batchsize);
List<Map<String,Object>> batch = Util.take(iterator, batchsize);
final long currentBatchSize = batch.size();
Periodic.ExecuteBatch executeBatch =
ExecuteBatch executeBatch =
iterateList ?
new Periodic.ListExecuteBatch(terminationGuard, collector, batch, consumer) :
new Periodic.OneByOneExecuteBatch(terminationGuard, collector, batch, consumer);
new ListExecuteBatch(terminationGuard, collector, batch, consumer) :
new OneByOneExecuteBatch(terminationGuard, collector, batch, consumer);

futures.add(Util.inTxFuture(log,
pool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

import apoc.Pools;
import apoc.util.Util;
import org.apache.commons.lang3.exception.ExceptionUtils;
import apoc.periodic.PeriodicUtils.JobInfo;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.schema.ConstraintDefinition;
Expand All @@ -19,13 +18,11 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static apoc.periodic.PeriodicUtils.recordError;
import static apoc.util.Util.merge;

public class Periodic {
Expand Down Expand Up @@ -112,12 +109,6 @@ public Stream<RundownResult> commit(@Name("statement") String statement, @Name(v
return Stream.of(new RundownResult(total,executions, timeTaken, batches.get(),failedBatches.get(),batchErrors, failedCommits.get(), commitErrors, wasTerminated));
}

private static void recordError(Map<String, Long> executionErrors, Exception e) {
String msg = ExceptionUtils.getRootCause(e).getMessage();
// String msg = ExceptionUtils.getThrowableList(e).stream().map(Throwable::getMessage).collect(Collectors.joining(","))
executionErrors.compute(msg, (s, i) -> i == null ? 1 : i + 1);
}

public static class RundownResult {
public final long updates;
public final long executions;
Expand Down Expand Up @@ -318,124 +309,6 @@ private static String prependQueryOption(String query, String cypherOption) {
: completePrefix + query;
}


static abstract class ExecuteBatch implements Function<Transaction, Long> {

protected TerminationGuard terminationGuard;
protected BatchAndTotalCollector collector;
protected List<Map<String,Object>> batch;
protected BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer;

ExecuteBatch(TerminationGuard terminationGuard,
BatchAndTotalCollector collector,
List<Map<String, Object>> batch,
BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
this.terminationGuard = terminationGuard;
this.collector = collector;
this.batch = batch;
this.consumer = consumer;
}

public void release() {
terminationGuard = null;
collector = null;
batch = null;
consumer = null;
}
}

static class ListExecuteBatch extends ExecuteBatch {

ListExecuteBatch(TerminationGuard terminationGuard,
BatchAndTotalCollector collector,
List<Map<String, Object>> batch,
BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
super(terminationGuard, collector, batch, consumer);
}

@Override
public final Long apply(Transaction txInThread) {
if (Util.transactionIsTerminated(terminationGuard)) return 0L;
Map<String, Object> params = Util.map("_count", collector.getCount(), "_batch", batch);
return executeAndReportErrors(txInThread, consumer, params, batch, batch.size(), null, collector);
}
}

static class OneByOneExecuteBatch extends ExecuteBatch {

OneByOneExecuteBatch(TerminationGuard terminationGuard,
BatchAndTotalCollector collector,
List<Map<String, Object>> batch,
BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer) {
super(terminationGuard, collector, batch, consumer);
}

@Override
public final Long apply(Transaction txInThread) {
if (Util.transactionIsTerminated(terminationGuard)) return 0L;
AtomicLong localCount = new AtomicLong(collector.getCount());
return batch.stream().mapToLong(
p -> {
if (localCount.get() % 1000 == 0 && Util.transactionIsTerminated(terminationGuard)) {
return 0;
}
Map<String, Object> params = merge(p, Util.map("_count", localCount.get(), "_batch", batch));
return executeAndReportErrors(txInThread, consumer, params, batch, 1, localCount, collector);
}).sum();
}
}

private static long executeAndReportErrors(Transaction tx, BiFunction<Transaction, Map<String, Object>, QueryStatistics> consumer, Map<String, Object> params,
List<Map<String, Object>> batch, int returnValue, AtomicLong localCount, BatchAndTotalCollector collector) {
try {
QueryStatistics statistics = consumer.apply(tx, params);
if (localCount!=null) {
localCount.getAndIncrement();
}
collector.updateStatistics(statistics);
return returnValue;
} catch (Exception e) {
collector.incrementFailedOps(batch.size());
collector.amendFailedParamsMap(batch);
recordError(collector.getOperationErrors(), e);
throw e;
}
}

public static class JobInfo {
public final String name;
public long delay;
public long rate;
public boolean done;
public boolean cancelled;

public JobInfo(String name) {
this.name = name;
}

public JobInfo(String name, long delay, long rate) {
this.name = name;
this.delay = delay;
this.rate = rate;
}

public JobInfo update(Future future) {
this.done = future.isDone();
this.cancelled = future.isCancelled();
return this;
}

@Override
public boolean equals(Object o) {
return this == o || o instanceof JobInfo && name.equals(((JobInfo) o).name);
}

@Override
public int hashCode() {
return name.hashCode();
}
}

private class Countdown implements Runnable {
private final String name;
private final String statement;
Expand Down
4 changes: 3 additions & 1 deletion full/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ shadowJar {

dependencies {
implementation project(":common")
apt project(":processor")
testImplementation project(":common").sourceSets.test.output
testImplementation project(':test-utils')
testImplementation project(":core")
apt project(":processor")

compileOnly "org.jetbrains.kotlin:kotlin-stdlib"
testImplementation "org.jetbrains.kotlin:kotlin-stdlib"
Expand Down
Loading

0 comments on commit b50f00b

Please sign in to comment.