Skip to content

Commit

Permalink
test: Fixed replicateLiveKeyExclude
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Mar 19, 2024
1 parent 302882a commit e8d783e
Show file tree
Hide file tree
Showing 27 changed files with 216 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void setVerifyCursorPosition(boolean verifyCursorPosition) {
@Override
protected Job job() {
String name = ClassUtils.getShortName(getClass());
return jobBuilder().start(step(name, reader(), null, writer()).build()).build();
return jobBuilder().start(step(name, reader(), writer())).build();
}

private ItemReader<Map<String, Object>> reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected Job job() {
String name = ClassUtils.getShortName(getClass());
FakerItemReader reader = reader();
ItemWriter<Map<String, Object>> writer = writer();
return jobBuilder().start(step(name, reader, null, writer).build()).build();
return jobBuilder().start(step(name, reader, writer)).build();
}

private FakerItemReader reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected Job job() {
ItemWriter<KeyValue<String>> writer = writer();
ItemProcessor<KeyValue<String>, KeyValue<String>> processor = new FunctionItemProcessor<>(
processor(StringCodec.UTF8));
return jobBuilder().start(step(getName(), reader, processor, writer).build()).build();
return jobBuilder().start(step(reader, processor, writer)).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ protected Job job() {
for (Resource resource : resources) {
ItemReader<KeyValue<String>> reader = reader(resource);
RedisItemWriter<String, String, KeyValue<String>> writer = writer();
steps.add(step(resource.getFilename(), reader, null, writer).build());
steps.add(step(resource.getFilename(), reader, writer));
}
Iterator<TaskletStep> iterator = steps.iterator();
SimpleJobBuilder job = jobBuilder().start(iterator.next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.springframework.batch.core.Job;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
Expand Down Expand Up @@ -127,30 +128,27 @@ protected Job job() {
if (resources.isEmpty()) {
throw new IllegalArgumentException("No file found");
}
List<TaskletStep> steps = new ArrayList<>();
for (Resource resource : resources) {
steps.add(step(resource));
}
Iterator<TaskletStep> iterator = steps.iterator();
Iterator<TaskletStep> iterator = resources.stream().map(this::step).iterator();
SimpleJobBuilder job = jobBuilder().start(iterator.next());
while (iterator.hasNext()) {
job.next(iterator.next());
}
return job.build();
}

@Override
protected <I, O> FaultTolerantStepBuilder<I, O> faultTolerant(SimpleStepBuilder<I, O> step) {
return super.faultTolerant(step).skip(ParseException.class).noRetry(ParseException.class);
}

private TaskletStep step(Resource resource) {
ItemReader<Map<String, Object>> reader = reader(resource);
if (maxItemCount != null && reader instanceof AbstractItemCountingItemStreamItemReader) {
((AbstractItemCountingItemStreamItemReader<Map<String, Object>>) reader).setMaxItemCount(maxItemCount);
}
ItemProcessor<Map<String, Object>, Map<String, Object>> processor = processor();
ItemWriter<Map<String, Object>> writer = writer();
FaultTolerantStepBuilder<Map<String, Object>, Map<String, Object>> step = step(resource.getFilename(), reader,
processor, writer);
step.skip(ParseException.class);
step.noRetry(ParseException.class);
return step.build();
return step(resource.getFilename(), reader, processor, writer);
}

private ItemReader<Map<String, Object>> reader(Resource resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class GeneratorImport extends AbstractStructImport {
protected Job job() {
GeneratorItemReader reader = reader();
RedisItemWriter<String, String, KeyValue<String>> writer = writer();
return jobBuilder().start(step(getName(), reader, null, writer).build()).build();
return jobBuilder().start(step(reader, writer)).build();
}

private GeneratorItemReader reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
import org.springframework.batch.core.job.builder.JobFlowBuilder;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.expression.spel.support.StandardEvaluationContext;
Expand All @@ -21,14 +24,15 @@
import com.redis.riot.core.RedisClientOptions;
import com.redis.riot.core.RedisWriterOptions;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.RedisItemReader.Mode;
import com.redis.spring.batch.RedisItemWriter;
import com.redis.spring.batch.common.KeyComparison;
import com.redis.spring.batch.common.KeyComparisonItemReader;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.reader.AbstractKeyValueItemReader;
import com.redis.spring.batch.reader.DumpItemReader;
import com.redis.spring.batch.reader.KeyTypeItemReader;
import com.redis.spring.batch.reader.StructItemReader;
import com.redis.spring.batch.step.FlushingStepBuilder;
import com.redis.spring.batch.writer.DumpItemWriter;
import com.redis.spring.batch.writer.KeyValueItemWriter;
import com.redis.spring.batch.writer.StructItemWriter;
Expand Down Expand Up @@ -102,29 +106,26 @@ protected void close() {

@Override
protected Job job() {
FaultTolerantStepBuilder<KeyValue<byte[]>, KeyValue<byte[]>> scanStep = step(STEP_SCAN, reader("scan-reader"));
RedisItemReader<byte[], byte[], KeyValue<byte[]>> reader = reader("live-reader");
reader.setMode(RedisItemReader.Mode.LIVE);
FaultTolerantStepBuilder<KeyValue<byte[]>, KeyValue<byte[]>> liveStep = step(STEP_LIVE, reader);
KeyComparisonStatusCountItemWriter comparisonWriter = new KeyComparisonStatusCountItemWriter();
FaultTolerantStepBuilder<KeyComparison, KeyComparison> compareStep = step(STEP_COMPARE, comparisonReader(),
null, comparisonWriter);
if (showDiffs) {
compareStep.listener(new KeyComparisonDiffLogger());
}
compareStep.listener(new KeyComparisonSummaryLogger(comparisonWriter));
RedisItemReader<byte[], byte[], KeyValue<byte[]>> reader = reader("scan-reader");
SimpleStepBuilder<KeyValue<byte[]>, KeyValue<byte[]>> scanStep = replicationStep(STEP_SCAN, reader);
RedisItemReader<byte[], byte[], KeyValue<byte[]>> liveReader = reader("live-reader");
liveReader.setMode(Mode.LIVE);
FlushingStepBuilder<KeyValue<byte[]>, KeyValue<byte[]>> liveStep = flushingStep(
replicationStep(STEP_LIVE, liveReader));
KeyComparisonStatusCountItemWriter compareWriter = new KeyComparisonStatusCountItemWriter();
TaskletStep compareStep = step(STEP_COMPARE, comparisonReader(), compareWriter);
switch (mode) {
case COMPARE:
return jobBuilder().start(compareStep.build()).build();
return jobBuilder().start(compareStep).build();
case LIVE:
checkKeyspaceNotificationEnabled();
SimpleFlow scanFlow = flow("scan").start(scanStep.build()).build();
SimpleFlow liveFlow = flow("live").start(liveStep.build()).build();
SimpleFlow scanFlow = flow("scan").start(build(scanStep)).build();
SimpleFlow liveFlow = flow("live").start(build(liveStep)).build();
SimpleFlow replicateFlow = flow("replicate").split(new SimpleAsyncTaskExecutor()).add(liveFlow, scanFlow)
.build();
JobFlowBuilder live = jobBuilder().start(replicateFlow);
if (shouldCompare()) {
live.next(compareStep.build());
live.next(compareStep);
}
return live.build().build();
case LIVEONLY:
Expand All @@ -133,7 +134,7 @@ protected Job job() {
case SNAPSHOT:
SimpleJobBuilder snapshot = jobBuilder().start(scanStep.build());
if (shouldCompare()) {
snapshot.next(compareStep.build());
snapshot.next(compareStep);
}
return snapshot.build();
default:
Expand All @@ -149,16 +150,34 @@ private boolean shouldCompare() {
return compareMode != CompareMode.NONE && !isDryRun();
}

private FaultTolerantStepBuilder<KeyValue<byte[]>, KeyValue<byte[]>> step(String name,
private SimpleStepBuilder<KeyValue<byte[]>, KeyValue<byte[]>> replicationStep(String name,
RedisItemReader<byte[], byte[], KeyValue<byte[]>> reader) {
RedisItemWriter<byte[], byte[], KeyValue<byte[]>> writer = writer();
ItemProcessor<KeyValue<byte[]>, KeyValue<byte[]>> processor = new FunctionItemProcessor<>(
processor(ByteArrayCodec.INSTANCE));
FaultTolerantStepBuilder<KeyValue<byte[]>, KeyValue<byte[]>> step = step(name, reader, processor, writer);
if (log.isDebugEnabled()) {
step.listener(new KeyValueWriteListener<>(reader.getCodec(), log));
return stepBuilder(name, reader, processor, writer);
}

@Override
protected void configureStep(SimpleStepBuilder<?, ?> step, String name, ItemReader<?> reader,
ItemWriter<?> writer) {
super.configureStep(step, name, reader, writer);
switch (name) {
case STEP_COMPARE:
if (showDiffs) {
step.listener(new KeyComparisonDiffLogger());
}
step.listener(new KeyComparisonSummaryLogger((KeyComparisonStatusCountItemWriter) writer));
break;
case STEP_LIVE:
case STEP_SCAN:
if (log.isDebugEnabled()) {
step.listener(new KeyValueWriteListener<>(ByteArrayCodec.INSTANCE, log));
}
break;
default:
break;
}
return step;
}

private void checkKeyspaceNotificationEnabled() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.redis.riot.redis;

import java.time.Duration;
import java.util.Map;
import java.util.function.Predicate;

Expand All @@ -14,10 +13,8 @@
import com.redis.riot.core.RedisClientOptions;
import com.redis.riot.core.RiotUtils;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.common.KeyComparisonItemReader;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.common.Range;
import com.redis.spring.batch.reader.StructItemReader;
import com.redis.spring.batch.step.FlushingStepBuilder;
import com.redis.spring.batch.test.AbstractTargetTestBase;
import com.redis.testcontainers.RedisServer;
Expand Down Expand Up @@ -97,15 +94,6 @@ void keyProcessorWithDate(TestInfo info) throws Throwable {
Assertions.assertEquals(value1, targetCommands.get("1273449600000:" + key1));
}

protected KeyComparisonItemReader comparisonReader(TestInfo info) {
StructItemReader<String, String> sourceReader = RedisItemReader.struct(client);
StructItemReader<String, String> targetReader = RedisItemReader.struct(targetClient);
KeyComparisonItemReader comparator = new KeyComparisonItemReader(sourceReader, targetReader);
comparator.setName(name(testInfo(info, "comparison-reader")));
comparator.setTtlTolerance(Duration.ofMillis(100));
return comparator;
}

@Test
void filterKeySlot(TestInfo info) throws Exception {
enableKeyspaceNotifications(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.function.Predicate;
import java.util.function.UnaryOperator;

import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.support.PassThroughItemProcessor;
import org.springframework.expression.spel.support.StandardEvaluationContext;
Expand All @@ -18,6 +19,7 @@
import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.reader.AbstractKeyValueItemReader;
import com.redis.spring.batch.step.FlushingStepBuilder;

import io.lettuce.core.codec.RedisCodec;

Expand Down Expand Up @@ -103,6 +105,11 @@ public <K> ItemProcessor<K, K> keyFilteringProcessor(RedisCodec<K, ?> codec) {
return new PredicateItemProcessor<>(predicate);
}

protected <I, O> FlushingStepBuilder<I, O> flushingStep(SimpleStepBuilder<I, O> step) {
return new FlushingStepBuilder<>(step).interval(readerOptions.getFlushInterval())
.idleTimeout(readerOptions.getIdleTimeout());
}

public RedisReaderOptions getReaderOptions() {
return readerOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;

import javax.sql.DataSource;

Expand All @@ -24,6 +23,7 @@
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamReader;
Expand All @@ -43,7 +43,6 @@
import org.springframework.util.ClassUtils;

import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.step.FlushingStepBuilder;
import com.redis.spring.batch.writer.AbstractOperationItemWriter;
import com.redis.spring.batch.writer.StructItemWriter;

Expand All @@ -62,7 +61,7 @@ public abstract class AbstractJobRunnable extends AbstractRunnable {
private static final String FAILED_JOB_MESSAGE = "Error executing job %s";

private String name;
private Consumer<RiotStep<?, ?>> stepConfigurer;
private List<StepConfigurator> stepConfigurators = new ArrayList<>();
private int threads = DEFAULT_THREADS;
private int chunkSize = DEFAULT_CHUNK_SIZE;
private Duration sleep = DEFAULT_SLEEP;
Expand All @@ -83,8 +82,8 @@ protected String name(String... suffixes) {
return String.join("-", elements);
}

public void setStepConfigurer(Consumer<RiotStep<?, ?>> stepConfigurer) {
this.stepConfigurer = stepConfigurer;
public void addStepConfigurator(StepConfigurator configurator) {
stepConfigurators.add(configurator);
}

public void setJobRepository(JobRepository jobRepository) {
Expand Down Expand Up @@ -173,34 +172,44 @@ protected JobBuilder jobBuilder() {
return writer;
}

protected <I, O> FaultTolerantStepBuilder<I, O> step(String name, ItemReader<I> reader,
protected <I, O> TaskletStep step(ItemReader<I> reader, ItemWriter<O> writer) {
return step(getName(), reader, null, writer);
}

protected <I, O> TaskletStep step(ItemReader<I> reader, ItemProcessor<I, O> processor, ItemWriter<O> writer) {
return step(getName(), reader, processor, writer);
}

protected <I, O> TaskletStep step(String name, ItemReader<I> reader, ItemWriter<O> writer) {
return step(name, reader, null, writer);
}

protected <I, O> TaskletStep step(String name, ItemReader<I> reader, ItemProcessor<I, O> processor,
ItemWriter<O> writer) {
return faultTolerant(stepBuilder(name, reader, processor, writer)).build();
}

protected <I, O> SimpleStepBuilder<I, O> stepBuilder(String name, ItemReader<I> reader,
ItemProcessor<I, O> processor, ItemWriter<O> writer) {
RiotStep<I, O> riotStep = new RiotStep<>();
riotStep.setName(name);
riotStep.setReader(reader);
riotStep.setProcessor(processor);
riotStep.setWriter(writer);
if (stepConfigurer != null) {
stepConfigurer.accept(riotStep);
}
SimpleStepBuilder<I, O> step = new StepBuilder(riotStep.getName(), jobRepository).chunk(chunkSize,
transactionManager);
step.reader(reader(riotStep.getReader()));
step.processor(riotStep.getProcessor());
step.writer(writer(riotStep.getWriter()));
step.taskExecutor(taskExecutor());
riotStep.getConfigurer().accept(step);
if (riotStep.getReader() instanceof RedisItemReader) {
RedisItemReader<?, ?, ?> redisReader = (RedisItemReader<?, ?, ?>) riotStep.getReader();
if (redisReader.isLive()) {
step = new FlushingStepBuilder<>(step).interval(redisReader.getFlushInterval())
.idleTimeout(redisReader.getIdleTimeout());
}
}
return faultTolerant(step);
SimpleStepBuilder<I, O> builder = new StepBuilder(name, jobRepository).chunk(chunkSize, transactionManager);
builder.reader(reader(reader));
builder.processor(processor);
builder.writer(writer(writer));
builder.taskExecutor(taskExecutor());
configureStep(builder, name, reader, writer);
stepConfigurators.forEach(s -> s.configure(builder, name, reader, writer));
return builder;
}

protected void configureStep(SimpleStepBuilder<?, ?> step, String name, ItemReader<?> reader,
ItemWriter<?> writer) {
}

protected <I, O> TaskletStep build(SimpleStepBuilder<I, O> step) {
return faultTolerant(step).build();
}

private <I, O> FaultTolerantStepBuilder<I, O> faultTolerant(SimpleStepBuilder<I, O> step) {
protected <I, O> FaultTolerantStepBuilder<I, O> faultTolerant(SimpleStepBuilder<I, O> step) {
FaultTolerantStepBuilder<I, O> ftStep = step.faultTolerant();
ftStep.skipLimit(skipLimit);
ftStep.retryLimit(retryLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ protected Job job() {
StructItemReader<String, String> reader = reader();
ItemProcessor<KeyValue<String>, Map<String, Object>> processor = processor();
ItemWriter<Map<String, Object>> writer = writer();
return jobBuilder().start(step(getName(), reader, processor, writer).build()).build();
return jobBuilder().start(step(reader, processor, writer)).build();
}

protected StructItemReader<String, String> reader() {
Expand Down
Loading

0 comments on commit e8d783e

Please sign in to comment.