Skip to content

Commit

Permalink
Bulk Load CDK: Do not fail on null stream state (#49979)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Dec 21, 2024
1 parent 1bd65af commit cb436fa
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,14 @@ sealed interface CheckpointMessage : DestinationMessage {
data class Stats(val recordCount: Long)
data class Checkpoint(
val stream: DestinationStream.Descriptor,
val state: JsonNode,
val state: JsonNode?,
) {
fun asProtocolObject(): AirbyteStreamState =
AirbyteStreamState()
.withStreamDescriptor(stream.asProtocolObject())
.withStreamState(state)
AirbyteStreamState().withStreamDescriptor(stream.asProtocolObject()).also {
if (state != null) {
it.streamState = state
}
}
}

val sourceStats: Stats?
Expand Down Expand Up @@ -503,7 +505,7 @@ class DestinationMessageFactory(
val descriptor = streamState.streamDescriptor
return Checkpoint(
stream = DestinationStream.Descriptor(descriptor.namespace, descriptor.name),
state = streamState.streamState
state = runCatching { streamState.streamState }.getOrNull()
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
Expand Down Expand Up @@ -283,4 +284,21 @@ class DestinationMessageTest {
.map { Arguments.of(it) }
}
}

@Test
fun testNullStreamState() {
val inputMessage =
AirbyteMessage()
.withType(AirbyteMessage.Type.STATE)
.withState(
AirbyteStateMessage()
.withType(AirbyteStateMessage.AirbyteStateType.STREAM)
.withStream(
AirbyteStreamState().withStreamDescriptor(descriptor.asProtocolObject())
)
.withSourceStats(AirbyteStateStats().withRecordCount(2.0))
)

assertDoesNotThrow { convert(factory(false), inputMessage) as StreamCheckpoint }
}
}

0 comments on commit cb436fa

Please sign in to comment.