Skip to content

Commit

Permalink
[Fix serverlessworkflow#484] Removing SortedArrayList
Browse files Browse the repository at this point in the history
Maybe for larger list that has to be kept sorte that one is useful, but
in this case is faster to add everything unsorted (all insertions are
really fast) and then sort (there is only one sort of a small amount of
items)

Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
  • Loading branch information
fjtirado committed Dec 3, 2024
1 parent 74fc958 commit 10488ea
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 165 deletions.
26 changes: 13 additions & 13 deletions impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,6 @@ public TaskContext(JsonNode input, WorkflowPosition position) {
this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>());
}

public TaskContext<T> copy() {
return new TaskContext<T>(
rawInput,
task,
position.copy(),
startedAt,
input,
output,
rawOutput,
flowDirective,
new HashMap<>(contextVariables));
}

public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
this(
input,
Expand Down Expand Up @@ -88,6 +75,19 @@ private TaskContext(
this.contextVariables = contextVariables;
}

public TaskContext<T> copy() {
return new TaskContext<T>(
rawInput,
task,
position.copy(),
startedAt,
input,
output,
rawOutput,
flowDirective,
new HashMap<>(contextVariables));
}

public void input(JsonNode input) {
this.input = input;
this.rawOutput = input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowState;
import io.serverlessworkflow.impl.WorkflowUtils;
import io.serverlessworkflow.impl.generic.SortedArrayList;
import io.serverlessworkflow.impl.json.JsonUtils;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,8 +48,6 @@ protected ForkExecutor(ForkTask task, WorkflowDefinition definition) {
service = definition.executorService();
}

private record BranchContext(String taskName, TaskContext<?> taskContext) {}

@Override
protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> taskContext) {
ForkTaskConfiguration forkConfig = task.getFork();
Expand All @@ -62,13 +61,10 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
item.getName(),
service.submit(() -> executeBranch(workflow, taskContext.copy(), item, i)));
}
List<BranchContext> results =
new SortedArrayList<>(
(arg1, arg2) ->
arg1.taskContext.completedAt().compareTo(arg2.taskContext.completedAt()));
List<Map.Entry<String, TaskContext<?>>> results = new ArrayList<>();
for (Map.Entry<String, Future<TaskContext<?>>> entry : futures.entrySet()) {
try {
results.add(new BranchContext(entry.getKey(), entry.getValue().get()));
results.add(Map.entry(entry.getKey(), entry.getValue().get()));
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();
if (cause instanceof RuntimeException) {
Expand All @@ -83,19 +79,22 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
ex);
}
}
if (!results.isEmpty()) {
taskContext.rawOutput(
forkConfig.isCompete()
? results.get(0).taskContext().output()
: JsonUtils.fromValue(
results.stream()
.map(
e ->
JsonUtils.mapper()
.createObjectNode()
.set(e.taskName(), e.taskContext().output()))
.collect(Collectors.toList())));
}
Stream<Map.Entry<String, TaskContext<?>>> sortedStream =
results.stream()
.sorted(
(arg1, arg2) ->
arg1.getValue().completedAt().compareTo(arg2.getValue().completedAt()));
taskContext.rawOutput(
forkConfig.isCompete()
? sortedStream.map(e -> e.getValue().output()).findFirst().orElseThrow()
: JsonUtils.fromValue(
sortedStream
.map(
e ->
JsonUtils.mapper()
.createObjectNode()
.set(e.getKey(), e.getValue().output()))
.collect(Collectors.toList())));
}
}

Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 10488ea

Please sign in to comment.