From 705c0d94dcf0b397ce57eae2718c48518627b561 Mon Sep 17 00:00:00 2001 From: Hai Yan <8153134+oeyh@users.noreply.github.com> Date: Fri, 7 Feb 2025 10:41:42 -0600 Subject: [PATCH] Support export from postgres (#5414) Signed-off-by: Hai Yan --- .../source/rds/configuration/EngineType.java | 2 +- .../state/DataFileProgressState.java | 15 ++++++++++++ .../state/ExportProgressState.java | 11 +++++++++ .../source/rds/export/DataFileLoader.java | 19 ++++++++------- .../source/rds/export/ExportScheduler.java | 9 ++++++- .../source/rds/leader/LeaderScheduler.java | 2 ++ .../source/rds/model/ExportObjectKey.java | 24 +++++++++++++++---- .../rds/stream/LogicalReplicationClient.java | 5 ++++ .../LogicalReplicationEventProcessor.java | 8 +++++-- .../source/rds/stream/StreamWorker.java | 2 ++ .../rds/leader/LeaderSchedulerTest.java | 9 +++++-- .../source/rds/model/ExportObjectKeyTest.java | 19 ++++++++++++++- .../stream/LogicalReplicationClientTest.java | 16 +++++++++++++ 13 files changed, 122 insertions(+), 19 deletions(-) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/EngineType.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/EngineType.java index 20f7f3b534..99980eae71 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/EngineType.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/EngineType.java @@ -33,7 +33,7 @@ public String toString() { } @JsonCreator - public static EngineType fromOptionValue(final String option) { + public static EngineType fromString(final String option) { return ENGINE_TYPE_MAP.get(option); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/DataFileProgressState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/DataFileProgressState.java index 9fee60105f..e865e73af8 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/DataFileProgressState.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/DataFileProgressState.java @@ -12,6 +12,9 @@ public class DataFileProgressState { + @JsonProperty("engineType") + private String engineType; + @JsonProperty("isLoaded") private boolean isLoaded = false; @@ -21,6 +24,10 @@ public class DataFileProgressState { @JsonProperty("sourceDatabase") private String sourceDatabase; + /** + * For MySQL, sourceTable is in the format of tableName + * For Postgres, sourceTable is in the format of schemaName.tableName + */ @JsonProperty("sourceTable") private String sourceTable; @@ -33,6 +40,14 @@ public class DataFileProgressState { @JsonProperty("snapshotTime") private long snapshotTime; + public String getEngineType() { + return engineType; + } + + public void setEngineType(String engineType) { + this.engineType = engineType; + } + public int getTotalRecords() { return totalRecords; } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/ExportProgressState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/ExportProgressState.java index e4bbeb4c98..74b387d3b4 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/ExportProgressState.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/ExportProgressState.java @@ -15,6 +15,9 @@ */ public class ExportProgressState { + @JsonProperty("engineType") + private String engineType; + @JsonProperty("snapshotId") private String snapshotId; @@ -48,6 +51,14 @@ public class ExportProgressState { @JsonProperty("status") private String status; + public String getEngineType() { + return engineType; + } + + public void setEngineType(String engineType) { + this.engineType = engineType; + } + public String getSnapshotId() { return snapshotId; } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java index b0a205c9ec..75f4dd0dc5 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType; import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState; @@ -127,7 +128,7 @@ public void run() { final String fullTableName = progressState.getSourceDatabase() + DOT_DELIMITER + progressState.getSourceTable(); final List primaryKeys = progressState.getPrimaryKeyMap().getOrDefault(fullTableName, List.of()); - transformEvent(event, fullTableName); + transformEvent(event, fullTableName, EngineType.fromString(progressState.getEngineType())); final long snapshotTime = progressState.getSnapshotTime(); final long eventVersionNumber = snapshotTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis(); @@ -173,13 +174,15 @@ public void run() { } } - private void transformEvent(final Event event, final String fullTableName) { - Map columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName); - for (Map.Entry entry : event.toMap().entrySet()) { - final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(), - entry.getValue(), null); - event.put(entry.getKey(), data); + private void transformEvent(final Event event, final String fullTableName, final EngineType engineType) { + // TODO: support data type mapping in Postgres + if (engineType == EngineType.MYSQL) { + Map columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName); + for (Map.Entry entry : event.toMap().entrySet()) { + final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(), + entry.getValue(), null); + event.put(entry.getKey(), data); + } } } - } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java index ce534747e1..97821a64b7 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; @@ -53,6 +54,7 @@ public class ExportScheduler implements Runnable { static final String EXPORT_JOB_SUCCESS_COUNT = "exportJobSuccess"; static final String EXPORT_JOB_FAILURE_COUNT = "exportJobFailure"; static final String EXPORT_S3_OBJECTS_TOTAL_COUNT = "exportS3ObjectsTotal"; + static final String DOT_DELIMITER = "."; private final S3Client s3Client; private final PluginMetrics pluginMetrics; @@ -65,6 +67,8 @@ public class ExportScheduler implements Runnable { private final Counter exportJobFailureCounter; private final Counter exportS3ObjectsTotalCounter; + private EngineType engineType; + private volatile boolean shutdownRequested = false; public ExportScheduler(final EnhancedSourceCoordinator sourceCoordinator, @@ -133,6 +137,7 @@ public void shutdown() { private String getOrCreateExportTaskId(ExportPartition exportPartition) { ExportProgressState progressState = exportPartition.getProgressState().get(); + engineType = EngineType.fromString(progressState.getEngineType()); if (progressState.getExportTaskId() != null) { LOG.info("Export task has already created for db {}", exportPartition.getDbIdentifier()); @@ -316,7 +321,9 @@ private void createDataFilePartitions(String bucket, final DataFileProgressState progressState = new DataFileProgressState(); final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKey); final String database = exportObjectKey.getDatabaseName(); - final String table = exportObjectKey.getTableName(); + final String table = engineType == EngineType.MYSQL ? + exportObjectKey.getTableName() : + exportObjectKey.getSchemaName() + DOT_DELIMITER + exportObjectKey.getTableName(); progressState.setSourceDatabase(database); progressState.setSourceTable(table); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java index f2587a079d..79489211e5 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java @@ -81,6 +81,7 @@ public void run() { if (leaderPartition != null) { LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); if (!leaderProgressState.isInitialized()) { + LOG.info("Performing initialization as LEADER node."); init(); } } @@ -139,6 +140,7 @@ private void init() { private void createExportPartition(RdsSourceConfig sourceConfig) { ExportProgressState progressState = new ExportProgressState(); + progressState.setEngineType(sourceConfig.getEngine().toString()); progressState.setIamRoleArn(sourceConfig.getExport().getIamRoleArn()); progressState.setBucket(sourceConfig.getS3Bucket()); // This prefix is for data exported from RDS diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java index dab6cc8d40..ee53b552d5 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java @@ -10,7 +10,7 @@ /** * Represents the object key for an object exported to S3 by RDS. - * The object key has this structure: "{prefix}/{export task ID}/{database name}/{table name}/{numbered folder}/{file name}" + * The object key has this structure: "{prefix}/{export task ID}/{database name}/{full table name}/{numbered folder}/{file name}" */ public class ExportObjectKey { @@ -18,14 +18,21 @@ public class ExportObjectKey { private final String prefix; private final String exportTaskId; private final String databaseName; + + /** + * schemaName is specific for Postgres; For MySQL, this schemaName has the same value as databaseName + */ + private final String schemaName; + private final String tableName; private final String numberedFolder; private final String fileName; - ExportObjectKey(final String prefix, final String exportTaskId, final String databaseName, final String tableName, final String numberedFolder, final String fileName) { + ExportObjectKey(final String prefix, final String exportTaskId, final String databaseName, final String schemaName, final String tableName, final String numberedFolder, final String fileName) { this.prefix = prefix; this.exportTaskId = exportTaskId; this.databaseName = databaseName; + this.schemaName = schemaName; this.tableName = tableName; this.numberedFolder = numberedFolder; this.fileName = fileName; @@ -42,13 +49,14 @@ public static ExportObjectKey fromString(final String objectKeyString) { .collect(Collectors.joining(S3_PATH_DELIMITER)); final String exportTaskId = parts[parts.length - 5]; final String databaseName = parts[parts.length - 4]; - // fullTableName is in the format of "databaseName.tableName" + // fullTableName is in the format of "databaseName.tableName" for MySQL and "schemaName.tableName" for Postgres final String fullTableName = parts[parts.length - 3]; + final String schemaName = fullTableName.split("\\.")[0]; final String tableName = fullTableName.split("\\.")[1]; final String numberedFolder = parts[parts.length - 2]; final String fileName = parts[parts.length - 1]; - return new ExportObjectKey(prefix, exportTaskId, databaseName, tableName, numberedFolder, fileName); + return new ExportObjectKey(prefix, exportTaskId, databaseName, schemaName, tableName, numberedFolder, fileName); } public String getPrefix() { @@ -63,6 +71,14 @@ public String getDatabaseName() { return databaseName; } + /** + * schemaName is specific for Postgres; For MySQL, this schemaName has the same value as databaseName + * @return schemaName + */ + public String getSchemaName() { + return schemaName; + } + public String getTableName() { return tableName; } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java index 8eb3b9cde9..95838ff586 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java @@ -85,15 +85,20 @@ public void connect() { stream.setAppliedLSN(lsn); } catch (Exception e) { LOG.error("Exception while processing Postgres replication stream. ", e); + throw e; } } } stream.close(); disconnectRequested = false; + if (eventProcessor != null) { + eventProcessor.stopCheckpointManager(); + } LOG.debug("Replication stream closed successfully."); } catch (Exception e) { LOG.error("Exception while creating Postgres replication stream. ", e); + throw new RuntimeException(e); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java index a2a9aa1017..f49f2acc66 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java @@ -173,12 +173,16 @@ public void process(ByteBuffer msg) { public void stopClient() { try { logicalReplicationClient.disconnect(); - LOG.info("Binary log client disconnected."); + LOG.info("Logical replication client disconnected."); } catch (Exception e) { - LOG.error("Binary log client failed to disconnect.", e); + LOG.error("Logical replication client failed to disconnect.", e); } } + public void stopCheckpointManager() { + streamCheckpointManager.stop(); + } + void processBeginMessage(ByteBuffer msg) { currentLsn = msg.getLong(); long epochMicro = msg.getLong(); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java index 4da8798c90..d1f5c8b31c 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java @@ -62,6 +62,8 @@ public void processStream(final StreamPartition streamPartition) { LOG.info("Connect to database to read change events."); replicationLogClient.connect(); } catch (Exception e) { + LOG.warn("Error while connecting to replication stream, will retry."); + sourceCoordinator.giveUpPartition(streamPartition); throw new RuntimeException(e); } finally { try { diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java index 060e3e9a29..e75f3dc28f 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java @@ -9,12 +9,15 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.Answers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig; +import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType; import org.opensearch.dataprepper.plugins.source.rds.configuration.ExportConfig; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; @@ -92,12 +95,14 @@ void non_leader_node_should_not_perform_init() throws InterruptedException { verify(sourceCoordinator, never()).createPartition(any(ExportPartition.class)); } - @Test - void leader_node_should_perform_init_if_not_initialized() throws InterruptedException { + @ParameterizedTest + @EnumSource(EngineType.class) + void leader_node_should_perform_init_if_not_initialized(EngineType engineType) throws InterruptedException { when(sourceCoordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).thenReturn(Optional.of(leaderPartition)); when(leaderPartition.getProgressState()).thenReturn(Optional.of(leaderProgressState)); when(leaderProgressState.isInitialized()).thenReturn(false); when(sourceConfig.isExportEnabled()).thenReturn(true); + when(sourceConfig.getEngine()).thenReturn(engineType); final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(leaderScheduler); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java index 18a66bd6e2..1a207a9031 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java @@ -15,13 +15,28 @@ class ExportObjectKeyTest { @Test - void test_fromString_with_valid_input_string() { + void test_fromString_with_valid_input_string_mysql() { final String objectKeyString = "prefix/export-task-id/db-name/db-name.table-name/1/file-name.parquet"; final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKeyString); assertThat(exportObjectKey.getPrefix(), equalTo("prefix")); assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id")); assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name")); + assertThat(exportObjectKey.getSchemaName(), equalTo("db-name")); + assertThat(exportObjectKey.getTableName(), equalTo("table-name")); + assertThat(exportObjectKey.getNumberedFolder(), equalTo("1")); + assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet")); + } + + @Test + void test_fromString_with_valid_input_string_postgres() { + final String objectKeyString = "prefix/export-task-id/db-name/schema-name.table-name/1/file-name.parquet"; + final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKeyString); + + assertThat(exportObjectKey.getPrefix(), equalTo("prefix")); + assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id")); + assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name")); + assertThat(exportObjectKey.getSchemaName(), equalTo("schema-name")); assertThat(exportObjectKey.getTableName(), equalTo("table-name")); assertThat(exportObjectKey.getNumberedFolder(), equalTo("1")); assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet")); @@ -35,6 +50,7 @@ void test_fromString_with_path_with_empty_prefix() { assertThat(exportObjectKey.getPrefix(), equalTo("")); assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id")); assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name")); + assertThat(exportObjectKey.getSchemaName(), equalTo("db-name")); assertThat(exportObjectKey.getTableName(), equalTo("table-name")); assertThat(exportObjectKey.getNumberedFolder(), equalTo("1")); assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet")); @@ -48,6 +64,7 @@ void test_fromString_with_path_with_multilevel_prefix() { assertThat(exportObjectKey.getPrefix(), equalTo("prefix1/prefix2/prefix3")); assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id")); assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name")); + assertThat(exportObjectKey.getSchemaName(), equalTo("db-name")); assertThat(exportObjectKey.getTableName(), equalTo("table-name")); assertThat(exportObjectKey.getNumberedFolder(), equalTo("1")); assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet")); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java index 45897335b5..be87bf5bda 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; @@ -89,6 +90,18 @@ void test_connect() throws SQLException, InterruptedException { verify(stream).setFlushedLSN(lsn); } + @Test + void test_connect_exception_should_throw() throws SQLException { + when(connectionManager.getConnection()).thenThrow(RuntimeException.class); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> logicalReplicationClient.connect()); + + assertThrows(RuntimeException.class, () -> logicalReplicationClient.connect()); + + executorService.shutdownNow(); + } + @Test void test_disconnect() throws SQLException, InterruptedException { final Connection connection = mock(Connection.class); @@ -119,6 +132,7 @@ void test_disconnect() throws SQLException, InterruptedException { logicalReplicationClient.disconnect(); Thread.sleep(20); verify(stream).close(); + verify(eventProcessor).stopCheckpointManager(); verifyNoMoreInteractions(stream, eventProcessor); executorService.shutdownNow(); @@ -155,6 +169,7 @@ void test_connect_disconnect_cycles() throws SQLException, InterruptedException logicalReplicationClient.disconnect(); Thread.sleep(20); verify(stream).close(); + verify(eventProcessor).stopCheckpointManager(); verifyNoMoreInteractions(stream, eventProcessor); // Second connect @@ -170,6 +185,7 @@ void test_connect_disconnect_cycles() throws SQLException, InterruptedException logicalReplicationClient.disconnect(); Thread.sleep(20); verify(stream, times(2)).close(); + verify(eventProcessor, times(2)).stopCheckpointManager(); verifyNoMoreInteractions(stream, eventProcessor); executorService.shutdownNow();