Skip to content

Commit

Permalink
refactor: created separate connector for Redis replication and ping
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Mar 15, 2024
1 parent cf1bd00 commit 569acfa
Show file tree
Hide file tree
Showing 159 changed files with 3,244 additions and 3,373 deletions.
19 changes: 6 additions & 13 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ subprojects { subproj ->
}

dependencies {
compileOnly group: 'com.google.code.findbugs', name: 'jsr305', version: jsr305Version
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: junitVersion
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params', version: junitVersion
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: junitVersion
testImplementation group: 'org.junit.platform', name: 'junit-platform-launcher', version: junitPlatformVersion
testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: testcontainersVersion
testImplementation 'org.junit.jupiter:junit-jupiter-api'
testImplementation 'org.junit.jupiter:junit-jupiter-params'
testImplementation 'org.junit.jupiter:junit-jupiter-engine'
testImplementation 'org.junit.platform:junit-platform-launcher'
testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.slf4j:slf4j-simple'
testImplementation group: 'com.redis', name: 'testcontainers-redis', version: testcontainersRedisVersion
}

Expand All @@ -111,13 +111,6 @@ subprojects { subproj ->
all*.exclude module: 'spring-boot-starter-logging'
}

configurations.all {
resolutionStrategy.eachDependency { DependencyResolveDetails details ->
if (details.requested.name == 'lettuce-core' ) {
details.useVersion lettuceVersion
}
}
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions connectors/riot-db/riot-db.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ dependencies {
implementation 'org.springframework:spring-jdbc'
implementation 'com.mysql:mysql-connector-j'
implementation 'org.postgresql:postgresql'
implementation group: 'com.microsoft.sqlserver', name: 'mssql-jdbc', version: mssqlVersion
implementation group: 'com.oracle.ojdbc', name: 'ojdbc8', version: oracleVersion
implementation 'com.microsoft.sqlserver:mssql-jdbc'
implementation 'com.oracle.database.jdbc:ojdbc11'
}

compileJava {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,58 @@
package com.redis.riot.db;

import javax.sql.DataSource;

import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;

public class DataSourceOptions {

private String driver;
private String driver;

private String url;

private String url;
private String username;

private String username;
private String password;

private String password;
public String getDriver() {
return driver;
}

public String getDriver() {
return driver;
}
public void setDriver(String driver) {
this.driver = driver;
}

public void setDriver(String driver) {
this.driver = driver;
}
public String getUrl() {
return url;
}

public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}

public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}

public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}

public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}

public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}

public void setPassword(String password) {
this.password = password;
}
public DataSource dataSource() {
DataSourceProperties properties = new DataSourceProperties();
properties.setUrl(url);
properties.setDriverClassName(driver);
properties.setUsername(username);
properties.setPassword(password);
return properties.initializeDataSourceBuilder().build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
Expand All @@ -13,55 +11,51 @@

public class DatabaseExport extends AbstractMapExport {

public static final boolean DEFAULT_ASSERT_UPDATES = true;

private String sql;

private DataSourceOptions dataSourceOptions = new DataSourceOptions();

private boolean assertUpdates = DEFAULT_ASSERT_UPDATES;

public void setSql(String sql) {
this.sql = sql;
}

public void setDataSourceOptions(DataSourceOptions dataSourceOptions) {
this.dataSourceOptions = dataSourceOptions;
}

public void setAssertUpdates(boolean assertUpdates) {
this.assertUpdates = assertUpdates;
}

@Override
protected JdbcBatchItemWriter<Map<String, Object>> writer() {
JdbcBatchItemWriterBuilder<Map<String, Object>> writer = new JdbcBatchItemWriterBuilder<>();
writer.itemSqlParameterSourceProvider(NullableSqlParameterSource::new);
writer.dataSource(dataSource());
writer.sql(sql);
writer.assertUpdates(assertUpdates);
return writer.build();
}

private DataSource dataSource() {
return DatabaseUtils.dataSource(dataSourceOptions);
}

private static class NullableSqlParameterSource extends MapSqlParameterSource {

public NullableSqlParameterSource(@Nullable Map<String, ?> values) {
super(values);
}

@Override
@Nullable
public Object getValue(String paramName) {
if (!hasValue(paramName)) {
return null;
}
return super.getValue(paramName);
}

}
public static final boolean DEFAULT_ASSERT_UPDATES = true;

private String sql;
private DataSourceOptions dataSourceOptions = new DataSourceOptions();
private boolean assertUpdates = DEFAULT_ASSERT_UPDATES;

public void setSql(String sql) {
this.sql = sql;
}

public void setDataSourceOptions(DataSourceOptions dataSourceOptions) {
this.dataSourceOptions = dataSourceOptions;
}

public void setAssertUpdates(boolean assertUpdates) {
this.assertUpdates = assertUpdates;
}

@Override
protected JdbcBatchItemWriter<Map<String, Object>> writer() {
JdbcBatchItemWriterBuilder<Map<String, Object>> builder = new JdbcBatchItemWriterBuilder<>();
builder.itemSqlParameterSourceProvider(NullableSqlParameterSource::new);
builder.dataSource(dataSourceOptions.dataSource());
builder.sql(sql);
builder.assertUpdates(assertUpdates);
JdbcBatchItemWriter<Map<String, Object>> writer = builder.build();
writer.afterPropertiesSet();
return writer;
}

private static class NullableSqlParameterSource extends MapSqlParameterSource {

public NullableSqlParameterSource(@Nullable Map<String, ?> values) {
super(values);
}

@Override
@Nullable
public Object getValue(String paramName) {
if (!hasValue(paramName)) {
return null;
}
return super.getValue(paramName);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,28 @@

import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.AbstractCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.jdbc.core.ColumnMapRowMapper;
import org.springframework.util.ClassUtils;

import com.redis.riot.core.AbstractImport;
import com.redis.riot.core.RiotContext;

public class DatabaseImport extends AbstractImport {

public static final int DEFAULT_FETCH_SIZE = AbstractCursorItemReader.VALUE_NOT_SET;

public static final int DEFAULT_MAX_RESULT_SET_ROWS = AbstractCursorItemReader.VALUE_NOT_SET;

public static final int DEFAULT_QUERY_TIMEOUT = AbstractCursorItemReader.VALUE_NOT_SET;

private String sql;

private DataSourceOptions dataSourceOptions = new DataSourceOptions();

private int maxItemCount;

private int fetchSize = DEFAULT_FETCH_SIZE;

private int maxResultSetRows = DEFAULT_MAX_RESULT_SET_ROWS;

private int queryTimeout = DEFAULT_QUERY_TIMEOUT;

private boolean useSharedExtendedConnection;

private boolean verifyCursorPosition;

public String getSql() {
Expand Down Expand Up @@ -104,18 +91,15 @@ public void setVerifyCursorPosition(boolean verifyCursorPosition) {
}

@Override
protected Job job(RiotContext executionContext) throws Exception {
protected Job job() {
String name = ClassUtils.getShortName(getClass());
ItemReader<Map<String, Object>> reader = reader();
ItemWriter<Map<String, Object>> writer = writer(executionContext);
return jobBuilder().start(step(name, reader, null, writer).build()).build();
return jobBuilder().start(step(name, reader(), null, writer()).build()).build();
}

private ItemReader<Map<String, Object>> reader() {
DataSource dataSource = DatabaseUtils.dataSource(dataSourceOptions);
JdbcCursorItemReaderBuilder<Map<String, Object>> builder = new JdbcCursorItemReaderBuilder<>();
builder.saveState(false);
builder.dataSource(dataSource);
builder.dataSource(dataSourceOptions.dataSource());
builder.name("database-reader");
builder.rowMapper(new ColumnMapRowMapper());
builder.sql(sql);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,18 @@
import com.redis.lettucemod.search.IndexInfo;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.riot.core.AbstractImport;
import com.redis.riot.core.RiotContext;
import com.redis.riot.core.RiotUtils;
import com.redis.spring.batch.common.Range;

public class FakerImport extends AbstractImport {

public static final int DEFAULT_COUNT = 1000;

public static final Locale DEFAULT_LOCALE = Locale.ENGLISH;

public static final Range DEFAULT_INDEX_RANGE = Range.from(1);

private Map<String, Expression> fields = new LinkedHashMap<>();

private int count = DEFAULT_COUNT;

private String searchIndex;

private Locale locale = DEFAULT_LOCALE;

public Map<String, Expression> getFields() {
Expand Down Expand Up @@ -69,32 +63,32 @@ public void setLocale(Locale locale) {
}

@Override
protected Job job(RiotContext executionContext) throws Exception {
protected Job job() {
String name = ClassUtils.getShortName(getClass());
FakerItemReader reader = reader(executionContext);
ItemWriter<Map<String, Object>> writer = writer(executionContext);
FakerItemReader reader = reader();
ItemWriter<Map<String, Object>> writer = writer();
return jobBuilder().start(step(name, reader, null, writer).build()).build();
}

private FakerItemReader reader(RiotContext executionContext) {
private FakerItemReader reader() {
FakerItemReader reader = new FakerItemReader();
reader.setMaxItemCount(count);
reader.setLocale(locale);
reader.setFields(fields(executionContext));
reader.setFields(fields());
return reader;
}

private Map<String, Expression> fields(RiotContext executionContext) {
private Map<String, Expression> fields() {
Map<String, Expression> allFields = new LinkedHashMap<>(fields);
if (searchIndex != null) {
allFields.putAll(searchIndexFields(executionContext));
allFields.putAll(searchIndexFields());
}
return allFields;
}

private Map<String, Expression> searchIndexFields(RiotContext executionContext) {
private Map<String, Expression> searchIndexFields() {
Map<String, Expression> searchFields = new LinkedHashMap<>();
RediSearchCommands<String, String> commands = executionContext.getRedisContext().getConnection().sync();
RediSearchCommands<String, String> commands = getRedisConnection().sync();
IndexInfo info = RedisModulesUtils.indexInfo(commands.ftInfo(searchIndex));
for (Field<String> field : info.getFields()) {
searchFields.put(field.getName(), RiotUtils.parse(expression(field)));
Expand Down
Loading

0 comments on commit 569acfa

Please sign in to comment.