Skip to content

Commit

Permalink
Load CDK: Remove Timed Flush Checkpoint Task (#52702)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Jan 31, 2025
1 parent 1afc861 commit 4a8db25
Show file tree
Hide file tree
Showing 7 changed files with 1 addition and 281 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ import com.google.common.collect.Range
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.QueueReader
import io.airbyte.cdk.load.task.internal.ForceFlushEvent
import io.micronaut.context.annotation.Secondary
import io.micronaut.context.annotation.Value
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap

interface FlushStrategy {
suspend fun shouldFlush(
Expand All @@ -26,7 +23,6 @@ interface FlushStrategy {
/**
* Flush whenever
* - bytes consumed >= the configured batch size
* - the current range of indexes being consumed encloses a force flush index
*/
@SuppressFBWarnings(
"NP_NONNULL_PARAM_VIOLATION",
Expand All @@ -36,30 +32,12 @@ interface FlushStrategy {
@Secondary
class DefaultFlushStrategy(
private val config: DestinationConfiguration,
private val eventQueue: QueueReader<ForceFlushEvent>,
@Value("\${airbyte.destination.core.record-batch-size-override}")
private val recordBatchSizeOverride: Long? = null
) : FlushStrategy {
private val forceFlushIndexes = ConcurrentHashMap<DestinationStream.Descriptor, Long>()

override suspend fun shouldFlush(
stream: DestinationStream.Descriptor,
rangeRead: Range<Long>,
bytesProcessed: Long
): Boolean {
if (bytesProcessed >= (recordBatchSizeOverride ?: config.recordBatchSizeBytes)) {
return true
}

// Listen to the event stream for a new force flush index
val nextFlushIndex = eventQueue.poll()?.indexes?.get(stream)

// Always update the index if the new one is not null
return when (
val testIndex = forceFlushIndexes.compute(stream) { _, v -> nextFlushIndex ?: v }
) {
null -> false
else -> rangeRead.contains(testIndex)
}
}
): Boolean = bytesProcessed >= (recordBatchSizeOverride ?: config.recordBatchSizeBytes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import io.airbyte.cdk.load.task.internal.FlushTickTask
import io.airbyte.cdk.load.task.internal.InputConsumerTaskFactory
import io.airbyte.cdk.load.task.internal.ReservingDeserializingInputFlow
import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory
import io.airbyte.cdk.load.task.internal.TimedForcedCheckpointFlushTask
import io.airbyte.cdk.load.task.internal.UpdateCheckpointsTask
import io.airbyte.cdk.load.util.setOnce
import io.github.oshai.kotlinlogging.KotlinLogging
Expand Down Expand Up @@ -113,7 +112,6 @@ class DefaultDestinationTaskLauncher(

// Checkpoint Tasks
private val flushCheckpointsTaskFactory: FlushCheckpointsTaskFactory,
private val timedCheckpointFlushTask: TimedForcedCheckpointFlushTask,
private val updateCheckpointsTask: UpdateCheckpointsTask,

// Exception handling
Expand Down Expand Up @@ -231,10 +229,6 @@ class DefaultDestinationTaskLauncher(
log.info { "Starting timed file aggregate flush task " }
launch(flushTickTask)

// Start the checkpoint management tasks
log.info { "Starting timed checkpoint flush task" }
launch(timedCheckpointFlushTask)

log.info { "Starting checkpoint update task" }
launch(updateCheckpointsTask)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,8 @@ package io.airbyte.cdk.load.state
import com.google.common.collect.Range
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.MockDestinationCatalogFactory
import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream2
import io.airbyte.cdk.load.message.ChannelMessageQueue
import io.airbyte.cdk.load.task.internal.ForceFlushEvent
import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Singleton
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
Expand All @@ -32,11 +26,6 @@ class DefaultFlushStrategyTest {
@Value("\${airbyte.destination.core.record-batch-size-override}")
private var recordBatchSizeOverride: Long? = null

@Singleton
@Primary
@Requires(env = ["FlushStrategyTest"])
class MockForceFlushEventQueue : ChannelMessageQueue<ForceFlushEvent>()

@Test
fun testFlushByByteSize(flushStrategy: DefaultFlushStrategy, config: DestinationConfiguration) =
runTest {
Expand All @@ -62,58 +51,4 @@ class DefaultFlushStrategyTest {
)
)
}

@Test
fun testFlushByIndex(
flushStrategy: DefaultFlushStrategy,
config: DestinationConfiguration,
forceFlushEventProducer: MockForceFlushEventQueue,
) = runTest {
// Ensure the size trigger is not a factor
val insufficientSize = (recordBatchSizeOverride ?: config.recordBatchSizeBytes) - 1L

Assertions.assertFalse(
flushStrategy.shouldFlush(stream1.descriptor, Range.all(), insufficientSize),
"Should not flush even with whole range if no event"
)

forceFlushEventProducer.publish(ForceFlushEvent(mapOf(stream1.descriptor to 42L)))
Assertions.assertFalse(
flushStrategy.shouldFlush(stream1.descriptor, Range.closed(0, 41), insufficientSize),
"Should not flush if index is not in range"
)
Assertions.assertTrue(
flushStrategy.shouldFlush(stream1.descriptor, Range.closed(0, 42), insufficientSize),
"Should flush if index is in range"
)

Assertions.assertFalse(
flushStrategy.shouldFlush(stream2.descriptor, Range.closed(0, 42), insufficientSize),
"Should not flush other streams"
)
forceFlushEventProducer.publish(ForceFlushEvent(mapOf(stream2.descriptor to 200L)))
Assertions.assertTrue(
flushStrategy.shouldFlush(stream2.descriptor, Range.closed(0, 200), insufficientSize),
"(Unless they also have flush points)"
)

Assertions.assertTrue(
flushStrategy.shouldFlush(stream1.descriptor, Range.closed(42, 100), insufficientSize),
"Should flush even if barely in range"
)
Assertions.assertFalse(
flushStrategy.shouldFlush(stream1.descriptor, Range.closed(43, 100), insufficientSize),
"Should not flush if index has been passed"
)

forceFlushEventProducer.publish(ForceFlushEvent(mapOf(stream1.descriptor to 100L)))
Assertions.assertFalse(
flushStrategy.shouldFlush(stream1.descriptor, Range.closed(0, 42), insufficientSize),
"New events indexes should invalidate old ones"
)
Assertions.assertTrue(
flushStrategy.shouldFlush(stream1.descriptor, Range.closed(43, 100), insufficientSize),
"New event indexes should be honored"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import io.airbyte.cdk.load.task.internal.InputConsumerTaskFactory
import io.airbyte.cdk.load.task.internal.ReservingDeserializingInputFlow
import io.airbyte.cdk.load.task.internal.SpillToDiskTask
import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory
import io.airbyte.cdk.load.task.internal.TimedForcedCheckpointFlushTask
import io.airbyte.cdk.load.task.internal.UpdateCheckpointsTask
import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Replaces
Expand Down Expand Up @@ -88,7 +87,6 @@ class DestinationTaskLauncherTest {
@Inject lateinit var closeStreamTaskFactory: MockCloseStreamTaskFactory
@Inject lateinit var teardownTaskFactory: MockTeardownTaskFactory
@Inject lateinit var flushCheckpointsTaskFactory: MockFlushCheckpointsTaskFactory
@Inject lateinit var mockForceFlushTask: MockForceFlushTask
@Inject lateinit var updateCheckpointsTask: MockUpdateCheckpointsTask
@Inject lateinit var inputFlow: ReservingDeserializingInputFlow
@Inject lateinit var queueWriter: MockQueueWriter
Expand Down Expand Up @@ -279,19 +277,6 @@ class DestinationTaskLauncherTest {
}
}

@Singleton
@Primary
@Requires(env = ["DestinationTaskLauncherTest"])
class MockForceFlushTask : TimedForcedCheckpointFlushTask {
override val terminalCondition: TerminalCondition = SelfTerminating

val didRun = Channel<Boolean>(Channel.UNLIMITED)

override suspend fun execute() {
didRun.send(true)
}
}

@Singleton
@Primary
@Requires(env = ["DestinationTaskLauncherTest"])
Expand Down Expand Up @@ -366,9 +351,6 @@ class DestinationTaskLauncherTest {

coVerify(exactly = config.numProcessBatchWorkers) { processBatchTaskFactory.make(any()) }

// Verify that we kicked off the timed force flush w/o a specific delay
Assertions.assertTrue(mockForceFlushTask.didRun.receive())

Assertions.assertTrue(
updateCheckpointsTask.didRun.receive(),
"update checkpoints task was started"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import io.airbyte.cdk.load.task.internal.InputConsumerTaskFactory
import io.airbyte.cdk.load.task.internal.ReservingDeserializingInputFlow
import io.airbyte.cdk.load.task.internal.SpillToDiskTask
import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory
import io.airbyte.cdk.load.task.internal.TimedForcedCheckpointFlushTask
import io.airbyte.cdk.load.task.internal.UpdateCheckpointsTask
import io.mockk.Called
import io.mockk.coEvery
Expand Down Expand Up @@ -68,7 +67,6 @@ class DestinationTaskLauncherUTest {

// Checkpoint Tasks
private val flushCheckpointsTaskFactory: FlushCheckpointsTaskFactory = mockk(relaxed = true)
private val timedFlushTask: TimedForcedCheckpointFlushTask = mockk(relaxed = true)
private val updateCheckpointsTask: UpdateCheckpointsTask = mockk(relaxed = true)
private val config: DestinationConfiguration = mockk(relaxed = true)

Expand Down Expand Up @@ -105,7 +103,6 @@ class DestinationTaskLauncherUTest {
closeStreamTaskFactory,
teardownTaskFactory,
flushCheckpointsTaskFactory,
timedFlushTask,
updateCheckpointsTask,
failStreamTaskFactory,
failSyncTaskFactory,
Expand Down
Loading

0 comments on commit 4a8db25

Please sign in to comment.