diff --git a/engine-agent/src/main/java/io/camunda/zeebe/process/test/engine/agent/RecordStreamSourceWrapper.java b/engine-agent/src/main/java/io/camunda/zeebe/process/test/engine/agent/RecordStreamSourceWrapper.java index 3e7e9df24..f450790af 100644 --- a/engine-agent/src/main/java/io/camunda/zeebe/process/test/engine/agent/RecordStreamSourceWrapper.java +++ b/engine-agent/src/main/java/io/camunda/zeebe/process/test/engine/agent/RecordStreamSourceWrapper.java @@ -2,12 +2,15 @@ import io.camunda.zeebe.process.test.api.RecordStreamSource; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.stream.StreamSupport; public class RecordStreamSourceWrapper { private final List mappedRecords = new ArrayList<>(); private final RecordStreamSource recordStreamSource; + private volatile long lastEventPosition = -1L; public RecordStreamSourceWrapper(final RecordStreamSource recordStreamSource) { this.recordStreamSource = recordStreamSource; @@ -15,9 +18,15 @@ public RecordStreamSourceWrapper(final RecordStreamSource recordStreamSource) { public List getMappedRecords() { synchronized (mappedRecords) { - mappedRecords.clear(); - recordStreamSource.records().forEach(record -> mappedRecords.add(record.toJson())); + StreamSupport.stream(recordStreamSource.records().spliterator(), false) + .filter(record -> record.getPosition() > lastEventPosition) + .forEach( + record -> { + mappedRecords.add(record.toJson()); + lastEventPosition = record.getPosition(); + }); } - return new ArrayList<>(mappedRecords); + + return Collections.unmodifiableList(mappedRecords); } }