diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt index d37acf278ab57..4dd13fc254b7a 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt @@ -14,7 +14,6 @@ import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates -import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.read.Stream import io.airbyte.cdk.util.Jsons import io.debezium.connector.mongodb.MongoDbConnector @@ -81,119 +80,110 @@ class CdcPartitionReaderMongoTest : fn(it.getCollection(stream.name)) } - override fun createDebeziumOperations(): DebeziumOperations { - return object : AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { - override fun position(recordValue: DebeziumRecordValue): BsonTimestamp? { - val resumeToken: String = - recordValue.source["resume_token"]?.takeIf { it.isTextual }?.asText() - ?: return null - return ResumeTokens.getTimestamp(ResumeTokens.fromData(resumeToken)) - } + override fun createDebeziumOperations(): DebeziumOperations = + MongoTestDebeziumOperations() - override fun position(sourceRecord: SourceRecord): BsonTimestamp? { - val offset: Map = sourceRecord.sourceOffset() - val resumeTokenBase64: String = offset["resume_token"] as? String ?: return null - return ResumeTokens.getTimestamp(ResumeTokens.fromBase64(resumeTokenBase64)) - } + inner class MongoTestDebeziumOperations : AbstractDebeziumOperationsForTest() { - override fun deserialize( - opaqueStateValue: OpaqueStateValue, - streams: List - ): DebeziumInput { - return super.deserialize(opaqueStateValue, streams).let { - DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) - } - } + override fun position(offset: DebeziumOffset): BsonTimestamp { + val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode + return BsonTimestamp(offsetValue["sec"].asInt(), offsetValue["ord"].asInt()) + } - override fun deserialize( - key: DebeziumRecordKey, - value: DebeziumRecordValue, - stream: Stream, - ): DeserializedRecord { - val id: Int = key.element("id").asInt() - val record: Record = - if (value.operation == "d") { - Delete(id) - } else { - val v: Int? = - value.after - .takeIf { it.isTextual } - ?.asText() - ?.let { Jsons.readTree(it)["v"] } - ?.asInt() - if (v == null) { - // In case a mongodb document was updated and then deleted, the update - // change - // event will not have any information ({after: null}) - // We are going to treat it as a Delete. - Delete(id) - } else if (value.operation == "u") { - Update(id, v) - } else { - Insert(id, v) - } - } - return DeserializedRecord( - data = Jsons.valueToTree(record), - changes = emptyMap(), - ) - } + override fun position(recordValue: DebeziumRecordValue): BsonTimestamp? { + val resumeToken: String = + recordValue.source["resume_token"]?.takeIf { it.isTextual }?.asText() ?: return null + return ResumeTokens.getTimestamp(ResumeTokens.fromData(resumeToken)) + } - override fun position(offset: DebeziumOffset): BsonTimestamp { - val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode - return BsonTimestamp(offsetValue["sec"].asInt(), offsetValue["ord"].asInt()) - } + override fun position(sourceRecord: SourceRecord): BsonTimestamp? { + val offset: Map = sourceRecord.sourceOffset() + val resumeTokenBase64: String = offset["resume_token"] as? String ?: return null + return ResumeTokens.getTimestamp(ResumeTokens.fromBase64(resumeTokenBase64)) + } - override fun synthesize(): DebeziumInput { - val resumeToken: BsonDocument = currentResumeToken() - val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken) - val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value - val key: ArrayNode = - Jsons.arrayNode().apply { - add(stream.namespace) - add(Jsons.objectNode().apply { put("server_id", stream.namespace) }) - } - val value: ObjectNode = - Jsons.objectNode().apply { - put("ord", timestamp.inc) - put("sec", timestamp.time) - put("resume_token", resumeTokenString) - } - val offset = DebeziumOffset(mapOf(key to value)) - val state = DebeziumState(offset, schemaHistory = null) - val syntheticProperties: Map = debeziumProperties() - return DebeziumInput(syntheticProperties, state, isSynthetic = true) + override fun generateColdStartOffset(): DebeziumOffset { + val resumeToken: BsonDocument = currentResumeToken() + val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken) + val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value + val key: ArrayNode = + Jsons.arrayNode().apply { + add(stream.namespace) + add(Jsons.objectNode().apply { put("server_id", stream.namespace) }) + } + val value: ObjectNode = + Jsons.objectNode().apply { + put("ord", timestamp.inc) + put("sec", timestamp.time) + put("resume_token", resumeTokenString) + } + return DebeziumOffset(mapOf(key to value)) + } + + override fun generateColdStartProperties(): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(MongoDbConnector::class.java) + .withDebeziumName(stream.namespace!!) + .withHeartbeats(heartbeat) + .with("capture.scope", "database") + .with("capture.target", stream.namespace!!) + .with("mongodb.connection.string", container.connectionString) + .with("snapshot.mode", "no_data") + .with( + "collection.include.list", + DebeziumPropertiesBuilder.joinIncludeList( + listOf(Pattern.quote("${stream.namespace!!}.${stream.name}")) + ) + ) + .with("database.include.list", stream.namespace!!) + .withOffset() + .buildMap() + + override fun generateWarmStartProperties(streams: List): Map = + generateColdStartProperties() + + fun currentResumeToken(): BsonDocument = + container.withMongoDatabase { mongoDatabase: MongoDatabase -> + val pipeline = listOf(Aggregates.match(Filters.`in`("ns.coll", stream.name))) + mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use { + it.tryNext() + it.resumeToken!! + } } - fun currentResumeToken(): BsonDocument = - container.withMongoDatabase { mongoDatabase: MongoDatabase -> - val pipeline = - listOf(Aggregates.match(Filters.`in`("ns.coll", stream.name))) - mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use { - it.tryNext() - it.resumeToken!! + override fun deserializeRecord( + key: DebeziumRecordKey, + value: DebeziumRecordValue, + stream: Stream + ): DeserializedRecord { + val id: Int = key.element("id").asInt() + val record: Record = + if (value.operation == "d") { + Delete(id) + } else { + val v: Int? = + value.after + .takeIf { it.isTextual } + ?.asText() + ?.let { Jsons.readTree(it)["v"] } + ?.asInt() + if (v == null) { + // In case a mongodb document was updated and then deleted, the update + // change + // event will not have any information ({after: null}) + // We are going to treat it as a Delete. + Delete(id) + } else if (value.operation == "u") { + Update(id, v) + } else { + Insert(id, v) } } - - fun debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(MongoDbConnector::class.java) - .withDebeziumName(stream.namespace!!) - .withHeartbeats(heartbeat) - .with("capture.scope", "database") - .with("capture.target", stream.namespace!!) - .with("mongodb.connection.string", container.connectionString) - .with("snapshot.mode", "no_data") - .with( - "collection.include.list", - DebeziumPropertiesBuilder.joinIncludeList( - listOf(Pattern.quote("${stream.namespace!!}.${stream.name}")) - ) - ) - .with("database.include.list", stream.namespace!!) - .withOffset() - .buildMap() + return DeserializedRecord( + data = Jsons.valueToTree(record), + changes = emptyMap(), + ) } } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt index 263f48198555f..7171e64079d54 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt @@ -6,9 +6,7 @@ package io.airbyte.cdk.read.cdc import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.node.ObjectNode -import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.read.Stream -import io.airbyte.cdk.read.cdc.* import io.airbyte.cdk.testcontainers.TestContainerFactory import io.airbyte.cdk.util.Jsons import io.debezium.connector.mysql.MySqlConnector @@ -73,90 +71,82 @@ class CdcPartitionReaderMySQLTest : } override fun createDebeziumOperations(): DebeziumOperations = - object : AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { - override fun position(offset: DebeziumOffset): Position { - val offsetAsJson = offset.wrapped.values.first() - val retVal = Position(offsetAsJson["file"].asText(), offsetAsJson["pos"].asLong()) - return retVal - } + MySQLTestDebeziumOperations() - override fun position(recordValue: DebeziumRecordValue): Position? { - val file: String = - recordValue.source["file"]?.takeIf { it.isTextual }?.asText() ?: return null - val pos: Long = - recordValue.source["pos"]?.takeIf { it.isIntegralNumber }?.asLong() - ?: return null - return Position(file, pos) - } + inner class MySQLTestDebeziumOperations : AbstractDebeziumOperationsForTest() { - override fun position(sourceRecord: SourceRecord): Position? { - val offset: Map = sourceRecord.sourceOffset() - val file: String = offset["file"]?.toString() ?: return null - val pos: Long = offset["pos"] as? Long ?: return null - return Position(file, pos) - } + override fun position(offset: DebeziumOffset): Position { + val offsetAsJson = offset.wrapped.values.first() + val retVal = Position(offsetAsJson["file"].asText(), offsetAsJson["pos"].asLong()) + return retVal + } - override fun synthesize(): DebeziumInput { - val position: Position = currentPosition() - val timestamp: Instant = Instant.now() - val key: ArrayNode = - Jsons.arrayNode().apply { - add(container.databaseName) - add(Jsons.objectNode().apply { put("server", container.databaseName) }) - } - val value: ObjectNode = - Jsons.objectNode().apply { - put("ts_sec", timestamp.epochSecond) - put("file", position.file) - put("pos", position.pos) - } - val offset = DebeziumOffset(mapOf(key to value)) - val state = DebeziumState(offset, schemaHistory = null) - val syntheticProperties: Map = - DebeziumPropertiesBuilder() - .with(debeziumProperties()) - .with("snapshot.mode", "recovery") - .withStreams(listOf()) - .buildMap() - return DebeziumInput(syntheticProperties, state, isSynthetic = true) - } + override fun position(recordValue: DebeziumRecordValue): Position? { + val file: String = + recordValue.source["file"]?.takeIf { it.isTextual }?.asText() ?: return null + val pos: Long = + recordValue.source["pos"]?.takeIf { it.isIntegralNumber }?.asLong() ?: return null + return Position(file, pos) + } - override fun deserialize( - opaqueStateValue: OpaqueStateValue, - streams: List - ): DebeziumInput { - return super.deserialize(opaqueStateValue, streams).let { - DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) - } - } + override fun position(sourceRecord: SourceRecord): Position? { + val offset: Map = sourceRecord.sourceOffset() + val file: String = offset["file"]?.toString() ?: return null + val pos: Long = offset["pos"] as? Long ?: return null + return Position(file, pos) + } - fun currentPosition(): Position = - container.withStatement { statement: Statement -> - statement.executeQuery("SHOW MASTER STATUS").use { - it.next() - Position(it.getString("File"), it.getLong("Position")) - } + override fun generateColdStartOffset(): DebeziumOffset { + val position: Position = currentPosition() + val timestamp: Instant = Instant.now() + val key: ArrayNode = + Jsons.arrayNode().apply { + add(container.databaseName) + add(Jsons.objectNode().apply { put("server", container.databaseName) }) } - - fun debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(MySqlConnector::class.java) - .withDebeziumName(container.databaseName) - .withHeartbeats(heartbeat) - .with("include.schema.changes", "false") - .with("connect.keep.alive.interval.ms", "1000") - .withDatabase("hostname", container.host) - .withDatabase("port", container.firstMappedPort.toString()) - .withDatabase("user", container.username) - .withDatabase("password", container.password) - .withDatabase("dbname", container.databaseName) - .withDatabase("server.id", Random.Default.nextInt(5400..6400).toString()) - .withDatabase("include.list", container.databaseName) - .withOffset() - .withSchemaHistory() - .with("snapshot.mode", "when_needed") - .withStreams(listOf(stream)) - .buildMap() + val value: ObjectNode = + Jsons.objectNode().apply { + put("ts_sec", timestamp.epochSecond) + put("file", position.file) + put("pos", position.pos) + } + return DebeziumOffset(mapOf(key to value)) } + + override fun generateColdStartProperties(): Map = + DebeziumPropertiesBuilder() + .with(generateWarmStartProperties(emptyList())) + .with("snapshot.mode", "recovery") + .withStreams(listOf()) + .buildMap() + + override fun generateWarmStartProperties(streams: List): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(MySqlConnector::class.java) + .withDebeziumName(container.databaseName) + .withHeartbeats(heartbeat) + .with("include.schema.changes", "false") + .with("connect.keep.alive.interval.ms", "1000") + .withDatabase("hostname", container.host) + .withDatabase("port", container.firstMappedPort.toString()) + .withDatabase("user", container.username) + .withDatabase("password", container.password) + .withDatabase("dbname", container.databaseName) + .withDatabase("server.id", Random.Default.nextInt(5400..6400).toString()) + .withDatabase("include.list", container.databaseName) + .withOffset() + .withSchemaHistory() + .with("snapshot.mode", "when_needed") + .withStreams(streams) + .buildMap() + + fun currentPosition(): Position = + container.withStatement { statement: Statement -> + statement.executeQuery("SHOW MASTER STATUS").use { + it.next() + Position(it.getString("File"), it.getLong("Position")) + } + } + } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt index 97a31616b6b6f..1d6571a4da697 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt @@ -6,7 +6,6 @@ package io.airbyte.cdk.read.cdc import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.node.ObjectNode -import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.read.Stream import io.airbyte.cdk.testcontainers.TestContainerFactory import io.airbyte.cdk.util.Jsons @@ -74,81 +73,72 @@ class CdcPartitionReaderPostgresTest : connection.createStatement().use { fn(it) } } - override fun createDebeziumOperations(): DebeziumOperations { - return object : - AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { - override fun position(offset: DebeziumOffset): LogSequenceNumber { - val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode - return LogSequenceNumber.valueOf(offsetValue["lsn"].asLong()) - } + override fun createDebeziumOperations(): DebeziumOperations = + PostgresTestDebeziumOperations() - override fun position(recordValue: DebeziumRecordValue): LogSequenceNumber? { - val lsn: Long = - recordValue.source["lsn"]?.takeIf { it.isIntegralNumber }?.asLong() - ?: return null - return LogSequenceNumber.valueOf(lsn) - } + inner class PostgresTestDebeziumOperations : + AbstractDebeziumOperationsForTest() { + override fun position(offset: DebeziumOffset): LogSequenceNumber { + val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode + return LogSequenceNumber.valueOf(offsetValue["lsn"].asLong()) + } - override fun position(sourceRecord: SourceRecord): LogSequenceNumber? { - val offset: Map = sourceRecord.sourceOffset() - val lsn: Long = offset["lsn"] as? Long ?: return null - return LogSequenceNumber.valueOf(lsn) - } + override fun position(recordValue: DebeziumRecordValue): LogSequenceNumber? { + val lsn: Long = + recordValue.source["lsn"]?.takeIf { it.isIntegralNumber }?.asLong() ?: return null + return LogSequenceNumber.valueOf(lsn) + } - override fun deserialize( - opaqueStateValue: OpaqueStateValue, - streams: List - ): DebeziumInput { - return super.deserialize(opaqueStateValue, streams).let { - DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) - } - } + override fun position(sourceRecord: SourceRecord): LogSequenceNumber? { + val offset: Map = sourceRecord.sourceOffset() + val lsn: Long = offset["lsn"] as? Long ?: return null + return LogSequenceNumber.valueOf(lsn) + } - override fun synthesize(): DebeziumInput { - val (position: LogSequenceNumber, txID: Long) = - container.withStatement { statement: Statement -> - statement.executeQuery("SELECT pg_current_wal_lsn(), txid_current()").use { - it.next() - LogSequenceNumber.valueOf(it.getString(1)) to it.getLong(2) - } + override fun generateWarmStartProperties(streams: List): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(PostgresConnector::class.java) + .withDebeziumName(container.databaseName) + .withHeartbeats(heartbeat) + .with("plugin.name", "pgoutput") + .with("slot.name", SLOT_NAME) + .with("publication.name", PUBLICATION_NAME) + .with("publication.autocreate.mode", "disabled") + .with("flush.lsn.source", "false") + .withDatabase("hostname", container.host) + .withDatabase("port", container.firstMappedPort.toString()) + .withDatabase("user", container.username) + .withDatabase("password", container.password) + .withDatabase("dbname", container.databaseName) + .withOffset() + .withStreams(streams) + .buildMap() + + override fun generateColdStartProperties(): Map = + generateWarmStartProperties(emptyList()) + + override fun generateColdStartOffset(): DebeziumOffset { + val (position: LogSequenceNumber, txID: Long) = + container.withStatement { statement: Statement -> + statement.executeQuery("SELECT pg_current_wal_lsn(), txid_current()").use { + it.next() + LogSequenceNumber.valueOf(it.getString(1)) to it.getLong(2) } - val timestamp: Instant = Instant.now() - val key: ArrayNode = - Jsons.arrayNode().apply { - add(container.databaseName) - add(Jsons.objectNode().apply { put("server", container.databaseName) }) - } - val value: ObjectNode = - Jsons.objectNode().apply { - put("ts_usec", timestamp.toEpochMilli() * 1000L) - put("lsn", position.asLong()) - put("txId", txID) - } - val offset = DebeziumOffset(mapOf(key to value)) - val state = DebeziumState(offset, schemaHistory = null) - val syntheticProperties: Map = debeziumProperties() - return DebeziumInput(syntheticProperties, state, isSynthetic = true) - } - - fun debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(PostgresConnector::class.java) - .withDebeziumName(container.databaseName) - .withHeartbeats(heartbeat) - .with("plugin.name", "pgoutput") - .with("slot.name", SLOT_NAME) - .with("publication.name", PUBLICATION_NAME) - .with("publication.autocreate.mode", "disabled") - .with("flush.lsn.source", "false") - .withDatabase("hostname", container.host) - .withDatabase("port", container.firstMappedPort.toString()) - .withDatabase("user", container.username) - .withDatabase("password", container.password) - .withDatabase("dbname", container.databaseName) - .withOffset() - .withStreams(listOf(stream)) - .buildMap() + } + val timestamp: Instant = Instant.now() + val key: ArrayNode = + Jsons.arrayNode().apply { + add(container.databaseName) + add(Jsons.objectNode().apply { put("server", container.databaseName) }) + } + val value: ObjectNode = + Jsons.objectNode().apply { + put("ts_usec", timestamp.toEpochMilli() * 1000L) + put("lsn", position.asLong()) + put("txId", txID) + } + return DebeziumOffset(mapOf(key to value)) } } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt index 01bea3ef4f8ef..cbf5ae53c31d2 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt @@ -81,8 +81,15 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab */ fun integrationTest() { container.createStream() - val p0: T = debeziumOperations.position(debeziumOperations.synthesize().state.offset) - val r0: ReadResult = read(debeziumOperations.synthesize(), p0) + val i0 = + ReadInput( + debeziumOperations.generateColdStartProperties(), + debeziumOperations.generateColdStartOffset(), + schemaHistory = null, + isSynthetic = true, + ) + val p0: T = debeziumOperations.position(i0.offset) + val r0: ReadResult = read(i0, p0) Assertions.assertEquals(emptyList(), r0.records) Assertions.assertNotEquals( CdcPartitionReader.CloseReason.RECORD_REACHED_TARGET_POSITION, @@ -105,22 +112,28 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab Update(3, 7), Update(5, 8), ) - val p1: T = debeziumOperations.position(debeziumOperations.synthesize().state.offset) + val p1: T = debeziumOperations.position(debeziumOperations.generateColdStartOffset()) container.delete24() val delete = listOf( Delete(2), Delete(4), ) - val p2: T = debeziumOperations.position(debeziumOperations.synthesize().state.offset) + val p2: T = debeziumOperations.position(debeziumOperations.generateColdStartOffset()) - val input: DebeziumInput = - debeziumOperations.deserialize(debeziumOperations.serialize(r0.state), listOf(stream)) - val r1: ReadResult = read(input, p1) + Assertions.assertTrue(r0.state is ValidDebeziumWarmStartState) + val i1 = + ReadInput( + debeziumOperations.generateWarmStartProperties(listOf(stream)), + (r0.state as ValidDebeziumWarmStartState).offset, + r0.state.schemaHistory, + isSynthetic = false, + ) + val r1: ReadResult = read(i1, p1) Assertions.assertEquals(insert + update, r1.records.take(insert.size + update.size)) Assertions.assertNotNull(r1.closeReason) - val r2: ReadResult = read(input, p2) + val r2: ReadResult = read(i1, p2) Assertions.assertEquals( insert + update + delete, r2.records.take(insert.size + update.size + delete.size), @@ -133,7 +146,7 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab } private fun read( - input: DebeziumInput, + input: ReadInput, upperBound: T, ): ReadResult { val outputConsumer = BufferingOutputConsumer(ClockFactory().fixed()) @@ -162,7 +175,10 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab streamRecordConsumers, debeziumOperations, upperBound, - input, + input.properties, + input.offset, + input.schemaHistory, + input.isSynthetic, ) Assertions.assertEquals( PartitionReader.TryAcquireResourcesStatus.READY_TO_RUN, @@ -196,14 +212,21 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab Assertions.assertEquals(0, reader.numEventValuesWithoutPosition.get()) return ReadResult( outputConsumer.records().map { Jsons.treeToValue(it.data, Record::class.java) }, - debeziumOperations.deserialize(checkpoint.opaqueStateValue, listOf(stream)).state, + debeziumOperations.deserializeState(checkpoint.opaqueStateValue), reader.closeReasonReference.get(), ) } + data class ReadInput( + val properties: Map, + val offset: DebeziumOffset, + val schemaHistory: DebeziumSchemaHistory?, + val isSynthetic: Boolean, + ) + data class ReadResult( val records: List, - val state: DebeziumState, + val state: DebeziumWarmStartState, val closeReason: CdcPartitionReader.CloseReason?, ) @@ -220,13 +243,13 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab data class Update(override val id: Int, val v: Int) : Record data class Delete(override val id: Int) : Record - abstract inner class AbstractCdcPartitionReaderDebeziumOperationsForTest>( - val stream: Stream - ) : DebeziumOperations { - override fun deserialize( + abstract inner class AbstractDebeziumOperationsForTest> : + DebeziumOperations { + + override fun deserializeRecord( key: DebeziumRecordKey, value: DebeziumRecordValue, - stream: Stream, + stream: Stream ): DeserializedRecord { val id: Int = key.element("id").asInt() val after: Int? = value.after["v"]?.asInt() @@ -252,27 +275,27 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab override fun findStreamName(key: DebeziumRecordKey, value: DebeziumRecordValue): String? = stream.id.name - override fun serialize(debeziumState: DebeziumState): OpaqueStateValue = + override fun serializeState( + offset: DebeziumOffset, + schemaHistory: DebeziumSchemaHistory? + ): OpaqueStateValue = Jsons.valueToTree( mapOf( "offset" to - debeziumState.offset.wrapped + offset.wrapped .map { Jsons.writeValueAsString(it.key) to Jsons.writeValueAsString(it.value) } .toMap(), "schemaHistory" to - debeziumState.schemaHistory?.wrapped?.map { + schemaHistory?.wrapped?.map { DocumentWriter.defaultWriter().write(it.document()) }, ), ) - override fun deserialize( - opaqueStateValue: OpaqueStateValue, - streams: List - ): DebeziumInput { + override fun deserializeState(opaqueStateValue: OpaqueStateValue): DebeziumWarmStartState { val offsetNode: ObjectNode = opaqueStateValue["offset"] as ObjectNode val offset = DebeziumOffset( @@ -293,8 +316,7 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab } else { null } - val deserializedStateValue = DebeziumState(offset, schemaHistory) - return DebeziumInput(emptyMap(), deserializedStateValue, false) + return ValidDebeziumWarmStartState(offset, schemaHistory) } } }