diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt index 04044cbc6e41e..5d51dbab3d595 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt @@ -78,6 +78,10 @@ data class DestinationStream( } } } + + fun shouldBeTruncatedAtEndOfSync(): Boolean { + return importType is Overwrite || minimumGenerationId == generationId + } } @Singleton diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/DestinationStateManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/DestinationStateManager.kt index 7d3195e415580..25c48158b61b3 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/DestinationStateManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/DestinationStateManager.kt @@ -33,10 +33,7 @@ class DefaultDestinationStateManager( } override suspend fun persistState(stream: DestinationStream) { - val state = - states[stream.descriptor] - ?: throw IllegalStateException("State not found for stream $stream") - persister.persist(stream, state) + states[stream.descriptor]?.let { persister.persist(stream, it) } } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt index 3bb5aa3e53a62..788cce2af7126 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt @@ -111,7 +111,6 @@ class DefaultInputConsumerTask( is DestinationFileStreamComplete -> { reserved.release() // safe because multiple calls conflate manager.markEndOfStream(true) - fileTransferQueue.close() val envelope = BatchEnvelope( SimpleBatch(Batch.State.COMPLETE), @@ -196,6 +195,7 @@ class DefaultInputConsumerTask( } finally { log.info { "Closing record queues" } catalog.streams.forEach { recordQueueSupplier.get(it.descriptor).close() } + fileTransferQueue.close() } } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt index a8d608b16b598..607b3976e790b 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt @@ -4,8 +4,6 @@ package io.airbyte.cdk.load.state.object_storage -import com.fasterxml.jackson.annotation.JsonIgnore -import com.fasterxml.jackson.annotation.JsonProperty import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient @@ -13,233 +11,115 @@ import io.airbyte.cdk.load.file.object_storage.PathFactory import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.state.DestinationState import io.airbyte.cdk.load.state.DestinationStatePersister -import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState.Companion.OPTIONAL_ORDINAL_SUFFIX_PATTERN -import io.airbyte.cdk.load.util.readIntoClass -import io.airbyte.cdk.load.util.serializeToJsonBytes import io.github.oshai.kotlinlogging.KotlinLogging -import io.micronaut.context.annotation.Factory -import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -import java.nio.file.Paths import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.fold import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") class ObjectStorageDestinationState( - // (State -> (GenerationId -> (Key -> PartNumber))) - @JsonProperty("generations_by_state") - var generationMap: - ConcurrentHashMap>> = - ConcurrentHashMap(), - @JsonProperty("count_by_key") var countByKey: MutableMap = mutableMapOf() + private val stream: DestinationStream, + private val client: ObjectStorageClient<*>, + private val pathFactory: PathFactory, ) : DestinationState { - enum class State { - STAGED, - FINALIZED - } + private val log = KotlinLogging.logger {} - @JsonIgnore private val countByKeyLock = Mutex() + private val countByKey: ConcurrentHashMap = ConcurrentHashMap() + private val fileNumbersByPath: ConcurrentHashMap = ConcurrentHashMap() + private val matcher = + pathFactory.getPathMatcher(stream, suffixPattern = OPTIONAL_ORDINAL_SUFFIX_PATTERN) companion object { const val METADATA_GENERATION_ID_KEY = "ab-generation-id" - const val STREAM_NAMESPACE_KEY = "ab-stream-namespace" - const val STREAM_NAME_KEY = "ab-stream-name" const val OPTIONAL_ORDINAL_SUFFIX_PATTERN = "(-[0-9]+)?" fun metadataFor(stream: DestinationStream): Map = mapOf(METADATA_GENERATION_ID_KEY to stream.generationId.toString()) } - suspend fun addObject( - generationId: Long, - key: String, - partNumber: Long?, - isStaging: Boolean = false - ) { - val state = if (isStaging) State.STAGED else State.FINALIZED - generationMap - .getOrPut(state) { ConcurrentHashMap() } - .getOrPut(generationId) { ConcurrentHashMap() }[key] = partNumber ?: 0L - } - - suspend fun removeObject(generationId: Long, key: String, isStaging: Boolean = false) { - val state = if (isStaging) State.STAGED else State.FINALIZED - generationMap[state]?.get(generationId)?.remove(key) - } - - suspend fun dropGenerationsBefore(minimumGenerationId: Long) { - State.entries.forEach { state -> - (0 until minimumGenerationId).forEach { generationMap[state]?.remove(it) } + /** + * Returns (generationId, object) for all objects that should be cleaned up. + * + * "should be cleaned up" means + * * stream.shouldBeTruncatedAtEndOfSync() is true + * * object's generation id exists and is less than stream.minimumGenerationId + */ + suspend fun getObjectsToDelete(): List>> { + if (!stream.shouldBeTruncatedAtEndOfSync()) { + return emptyList() } - } - - data class Generation( - val isStaging: Boolean, - val generationId: Long, - val objects: List - ) - data class ObjectAndPart( - val key: String, - val partNumber: Long, - ) - - suspend fun getGenerations(): Sequence = - generationMap.entries - .asSequence() - .map { (state, gens) -> - val isStaging = state == State.STAGED - gens.map { (generationId, objects) -> - Generation( - isStaging, - generationId, - objects.map { (key, partNumber) -> ObjectAndPart(key, partNumber) } - ) + return client + .list(pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false)) + .filter { matcher.match(it.key) != null } + .mapNotNull { obj -> + val generationId = + client.getMetadata(obj.key)[METADATA_GENERATION_ID_KEY]?.toLongOrNull() ?: 0L + if (generationId < stream.minimumGenerationId) { + Pair(generationId, obj) + } else { + null } } - .flatten() - - suspend fun getNextPartNumber(): Long = - getGenerations().flatMap { it.objects }.map { it.partNumber }.maxOrNull()?.plus(1) ?: 0L - - /** Returns generationId -> objectAndPart for all staged objects that should be kept. */ - suspend fun getStagedObjectsToFinalize( - minimumGenerationId: Long - ): Sequence> = - getGenerations() - .filter { it.isStaging && it.generationId >= minimumGenerationId } - .flatMap { it.objects.map { obj -> it.generationId to obj } } + .toList() + } /** - * Returns generationId -> objectAndPart for all objects (staged and unstaged) that should be - * cleaned up. + * Ensures the key is unique by appending `-${max_suffix + 1}` if there is a conflict. If the + * key is unique, it is returned as-is. */ - suspend fun getObjectsToDelete(minimumGenerationId: Long): Sequence> { - val (toKeep, toDrop) = getGenerations().partition { it.generationId >= minimumGenerationId } - val keepKeys = toKeep.flatMap { it.objects.map { obj -> obj.key } }.toSet() - return toDrop.asSequence().flatMap { - it.objects.filter { obj -> obj.key !in keepKeys }.map { obj -> it.generationId to obj } - } - } - - /** Used to guarantee the uniqueness of a key */ suspend fun ensureUnique(key: String): String { - val ordinal = - countByKeyLock.withLock { - countByKey.merge(key, 0L) { old, new -> maxOf(old + 1, new) } - } - ?: 0L - return if (ordinal > 0L) { - "$key-$ordinal" - } else { + val count = + countByKey + .getOrPut(key) { + client + .list(key) + .mapNotNull { matcher.match(it.key) } + .fold(-1L) { acc, match -> + maxOf(match.customSuffix?.removePrefix("-")?.toLongOrNull() ?: 0L, acc) + } + .let { AtomicLong(it) } + } + .incrementAndGet() + + return if (count == 0L) { key + } else { + "$key-$count" } } -} -@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") -class ObjectStorageStagingPersister( - private val client: ObjectStorageClient<*>, - private val pathFactory: PathFactory -) : DestinationStatePersister { - private val log = KotlinLogging.logger {} - private val fallbackPersister = ObjectStorageFallbackPersister(client, pathFactory) - - companion object { - const val STATE_FILENAME = "__airbyte_state.json" - } - - private fun keyFor(stream: DestinationStream): String = - Paths.get(pathFactory.getStagingDirectory(stream), STATE_FILENAME).toString() - - override suspend fun load(stream: DestinationStream): ObjectStorageDestinationState { - val key = keyFor(stream) - try { - log.info { "Loading destination state from $key" } - return client.get(key) { inputStream -> - inputStream.readIntoClass(ObjectStorageDestinationState::class.java) - } - } catch (e: Exception) { - log.info { "No destination state found at $key: $e; falling back to metadata search" } - return fallbackPersister.load(stream) + /** Returns a shared atomic long referencing the max {part_number} for any given path. */ + suspend fun getPartIdCounter(path: String): AtomicLong { + return fileNumbersByPath.getOrPut(path) { + client + .list(path) + .mapNotNull { matcher.match(it.key) } + .fold(-1L) { acc, match -> maxOf(match.partNumber ?: 0L, acc) } + .let { AtomicLong(it) } } } - - override suspend fun persist(stream: DestinationStream, state: ObjectStorageDestinationState) { - client.put(keyFor(stream), state.serializeToJsonBytes()) - } } +/** + * Note: there's no persisting yet. This will require either a client-provided path to store data or + * a guaranteed sortable set of file names so that we can send the high watermark to the platform. + */ @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") +@Singleton class ObjectStorageFallbackPersister( private val client: ObjectStorageClient<*>, private val pathFactory: PathFactory ) : DestinationStatePersister { - private val log = KotlinLogging.logger {} override suspend fun load(stream: DestinationStream): ObjectStorageDestinationState { - // Add a suffix matching an OPTIONAL -[0-9]+ ordinal - val matcher = - pathFactory.getPathMatcher(stream, suffixPattern = OPTIONAL_ORDINAL_SUFFIX_PATTERN) - val longestUnambiguous = - pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false) - log.info { - "Searching path $longestUnambiguous (matching ${matcher.regex}) for destination state metadata" - } - val matches = client.list(longestUnambiguous).mapNotNull { matcher.match(it.key) }.toList() - - /* Initialize the unique key counts. */ - val countByKey = mutableMapOf() - matches.forEach { - val key = it.path.replace(Regex("-[0-9]+$"), "") - val ordinal = it.customSuffix?.substring(1)?.toLongOrNull() ?: 0 - countByKey.merge(key, ordinal) { a, b -> maxOf(a, b) } - } - - /* Build (generationId -> (key -> fileNumber)). */ - val generationIdToKeyAndFileNumber = - ConcurrentHashMap( - matches - .groupBy { - client - .getMetadata(it.path)[ - ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY] - ?.toLong() - ?: 0L - } - .mapValues { (_, matches) -> - ConcurrentHashMap(matches.associate { it.path to (it.partNumber ?: 0L) }) - } - ) - - return ObjectStorageDestinationState( - ConcurrentHashMap( - mapOf( - ObjectStorageDestinationState.State.FINALIZED to generationIdToKeyAndFileNumber - ) - ), - countByKey - ) + return ObjectStorageDestinationState(stream, client, pathFactory) } override suspend fun persist(stream: DestinationStream, state: ObjectStorageDestinationState) { // No-op; state is persisted when the generation id is set on the object metadata } } - -@Factory -class ObjectStorageDestinationStatePersisterFactory>( - private val client: ObjectStorageClient, - private val pathFactory: PathFactory -) { - @Singleton - @Secondary - fun create(): DestinationStatePersister = - if (pathFactory.supportsStaging) { - ObjectStorageStagingPersister(client, pathFactory) - } else { - ObjectStorageFallbackPersister(client, pathFactory) - } -} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index 3325e9e6597a9..5cc0556335a28 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -18,8 +18,6 @@ import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.message.object_storage.* -import io.airbyte.cdk.load.message.object_storage.LoadedObject -import io.airbyte.cdk.load.message.object_storage.ObjectStorageBatch import io.airbyte.cdk.load.state.DestinationStateManager import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState @@ -32,7 +30,6 @@ import io.micronaut.context.annotation.Value import jakarta.inject.Singleton import java.io.File import java.io.OutputStream -import java.util.concurrent.atomic.AtomicLong @Singleton @Secondary @@ -79,28 +76,20 @@ class ObjectStorageStreamLoader, U : OutputStream>( ) : StreamLoader { private val log = KotlinLogging.logger {} - // Used for naming files. Distinct from part index, which is used to track uploads. - private val fileNumber = AtomicLong(0L) private val objectAccumulator = PartToObjectAccumulator(stream, client) - override suspend fun start() { - val state = destinationStateManager.getState(stream) - // This is the number used to populate {part_number} on the object path. - // We'll call it file number here to avoid confusion with the part index used for uploads. - val fileNumber = state.getNextPartNumber() - log.info { "Got next file number from destination state: $fileNumber" } - this.fileNumber.set(fileNumber) - } - override suspend fun createBatchAccumulator(): BatchAccumulator { + val state = destinationStateManager.getState(stream) return RecordToPartAccumulator( pathFactory, bufferedWriterFactory, partSizeBytes = partSizeBytes, fileSizeBytes = fileSizeBytes, stream, - fileNumber - ) { name -> destinationStateManager.getState(stream).ensureUnique(name) } + state.getPartIdCounter(pathFactory.getFinalDirectory(stream)), + ) { name -> + state.ensureUnique(name) + } } override suspend fun createFileBatchAccumulator( @@ -109,60 +98,24 @@ class ObjectStorageStreamLoader, U : OutputStream>( @VisibleForTesting fun createFile(url: String) = File(url) - override suspend fun processBatch(batch: Batch): Batch { - val nextBatch = objectAccumulator.processBatch(batch) as ObjectStorageBatch - when (nextBatch) { - is LoadedObject<*> -> { - // Mark that we've completed the upload and persist the state before returning the - // persisted batch. - // Otherwise, we might lose track of the upload if the process crashes before - // persisting. - // TODO: Migrate all state bookkeeping to the CDK if possible - val state = destinationStateManager.getState(stream) - state.addObject( - stream.generationId, - nextBatch.remoteObject.key, - nextBatch.fileNumber, - isStaging = pathFactory.supportsStaging - ) - destinationStateManager.persistState(stream) - } - else -> {} // Do nothing - } - return nextBatch - } + override suspend fun processBatch(batch: Batch): Batch = objectAccumulator.processBatch(batch) override suspend fun close(streamFailure: StreamProcessingFailed?) { if (streamFailure != null) { log.info { "Sync failed, persisting destination state for next run" } - destinationStateManager.persistState(stream) - } else { + } else if (stream.shouldBeTruncatedAtEndOfSync()) { + log.info { "Truncate sync succeeded, Removing old files" } val state = destinationStateManager.getState(stream) - log.info { "Sync succeeded, Removing old files" } - state.getObjectsToDelete(stream.minimumGenerationId).forEach { - (generationId, objectAndPart) -> + + state.getObjectsToDelete().forEach { (generationId, objectAndPart) -> log.info { "Deleting old object for generation $generationId: ${objectAndPart.key}" } client.delete(objectAndPart.key) - state.removeObject(generationId, objectAndPart.key) - } - - log.info { "Moving all current data out of staging" } - state.getStagedObjectsToFinalize(stream.minimumGenerationId).forEach { - (generationId, objectAndPart) -> - val newKey = - pathFactory.getPathToFile(stream, objectAndPart.partNumber, isStaging = false) - log.info { - "Moving staged object of generation $generationId: ${objectAndPart.key} to $newKey" - } - val newObject = client.move(objectAndPart.key, newKey) - state.removeObject(generationId, objectAndPart.key, isStaging = true) - state.addObject(generationId, newObject.key, objectAndPart.partNumber) } log.info { "Persisting state" } - destinationStateManager.persistState(stream) } + destinationStateManager.persistState(stream) } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt index cedba80734af4..dace691b97584 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt @@ -46,7 +46,7 @@ class RecordToPartAccumulator( // Start a new object if there is not one in progress. val partialUpload = currentObject.getOrPut(key) { - val fileNo = fileNumber.getAndIncrement() + val fileNo = fileNumber.incrementAndGet() ObjectInProgress( partFactory = PartFactory( diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt deleted file mode 100644 index 141b4ad393cb5..0000000000000 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.load.state.object_storage - -import io.airbyte.cdk.load.MockObjectStorageClient -import io.airbyte.cdk.load.MockPathFactory -import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.command.MockDestinationCatalogFactory -import io.airbyte.cdk.load.file.NoopProcessor -import io.airbyte.cdk.load.state.DestinationStateManager -import io.micronaut.context.annotation.Primary -import io.micronaut.context.annotation.Property -import io.micronaut.context.annotation.Requires -import io.micronaut.test.extensions.junit5.annotation.MicronautTest -import jakarta.inject.Singleton -import java.nio.file.Paths -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.test.runTest -import org.junit.jupiter.api.Assertions -import org.junit.jupiter.api.Nested -import org.junit.jupiter.api.Test - -class ObjectStorageDestinationStateTest { - @Singleton - @Requires(env = ["ObjectStorageDestinationStateTest"]) - data class Dependencies( - val stateManager: DestinationStateManager, - val mockClient: MockObjectStorageClient, - val pathFactory: MockPathFactory - ) - - companion object { - val stream1 = MockDestinationCatalogFactory.stream1 - const val PERSISTED = - """{"generations_by_state":{"FINALIZED":{"0":{"key1":0,"key2":1},"1":{"key3":0,"key4":1}}},"count_by_key":{}}""" - } - - @Singleton - @Primary - @Requires(property = "object-storage-destination-state-test.use-staging", value = "true") - class MockPathFactoryWithStaging : MockPathFactory() { - override var doSupportStaging = true - } - - @Singleton - @Primary - @Requires(property = "object-storage-destination-state-test.use-staging", value = "false") - class MockPathFactoryWithoutStaging : MockPathFactory() { - override var doSupportStaging = false - } - - @Nested - @MicronautTest( - rebuildContext = true, - environments = - [ - "ObjectStorageDestinationStateTest", - "MockObjectStorageClient", - "MockDestinationCatalog", - ], - ) - @Property(name = "object-storage-destination-state-test.use-staging", value = "true") - inner class ObjectStorageDestinationStateTestStaging { - @Test - fun testBasicLifecycle(d: Dependencies) = runTest { - val state = d.stateManager.getState(stream1) - Assertions.assertEquals( - emptyList(), - state.getGenerations().toList(), - "state should initially be empty" - ) - state.addObject(0, "key1", 0) - state.addObject(0, "key2", 1) - state.addObject(1, "key3", 0) - state.addObject(1, "key4", 1) - Assertions.assertEquals( - 4, - state.getGenerations().flatMap { it.objects }.toList().size, - "state should contain 4 objects" - ) - - d.stateManager.persistState(stream1) - val obj = d.mockClient.list("").toList().first() - val data = d.mockClient.get(obj.key) { it.readBytes() } - Assertions.assertEquals( - PERSISTED, - data.toString(Charsets.UTF_8), - "state should be persisted" - ) - - state.removeObject(0, "key1") - state.removeObject(0, "key2") - state.removeObject(1, "key3") - state.removeObject(1, "key4") - Assertions.assertEquals( - emptyList(), - state.getGenerations().flatMap { it.objects }.toList(), - "objects should be removed" - ) - - val fetchedState = d.stateManager.getState(stream1) - Assertions.assertEquals( - 0, - fetchedState.getGenerations().flatMap { it.objects }.toList().size, - "state should still contain 0 objects (managed state is in cache)" - ) - } - - @Test - fun testLoadingExistingState(d: Dependencies) = runTest { - val key = - Paths.get( - d.pathFactory.getStagingDirectory(stream1), - ObjectStorageStagingPersister.STATE_FILENAME - ) - .toString() - d.mockClient.put(key, PERSISTED.toByteArray()) - val state = d.stateManager.getState(stream1) - Assertions.assertEquals( - listOf( - ObjectStorageDestinationState.Generation( - false, - 0, - listOf( - ObjectStorageDestinationState.ObjectAndPart("key1", 0), - ObjectStorageDestinationState.ObjectAndPart("key2", 1) - ) - ), - ObjectStorageDestinationState.Generation( - false, - 1, - listOf( - ObjectStorageDestinationState.ObjectAndPart("key3", 0), - ObjectStorageDestinationState.ObjectAndPart("key4", 1) - ) - ) - ), - state.getGenerations().toList(), - "state should be loaded from storage" - ) - - Assertions.assertEquals(2L, state.getNextPartNumber()) - } - - @Test - fun testFallbackToMetadataState(d: Dependencies) = runTest { - val generations = - ObjectStorageDestinationStateTestWithoutStaging().loadMetadata(d, stream1) - val state = d.stateManager.getState(stream1) - ObjectStorageDestinationStateTestWithoutStaging().validateMetadata(state, generations) - Assertions.assertEquals(2L, state.getNextPartNumber()) - } - - @Test - fun testGetObjectsToMoveAndDelete(d: Dependencies) = runTest { - val state = d.stateManager.getState(stream1) - state.addObject(generationId = 0L, "old-finalized", partNumber = 0L, isStaging = false) - state.addObject(generationId = 1L, "new-finalized", partNumber = 1L, isStaging = false) - state.addObject( - generationId = 0L, - "leftover-old-staging", - partNumber = 2L, - isStaging = true - ) - state.addObject(generationId = 1L, "new-staging", partNumber = 3L, isStaging = true) - val toFinalize = - state - .getStagedObjectsToFinalize(minimumGenerationId = 1L) - .map { it.first to it.second } - .toSet() - - Assertions.assertEquals( - setOf(1L to ObjectStorageDestinationState.ObjectAndPart("new-staging", 3L)), - toFinalize, - "only new-staging should be finalized" - ) - - val toDelete = - state - .getObjectsToDelete(minimumGenerationId = 1L) - .map { it.first to it.second } - .toSet() - Assertions.assertEquals( - setOf( - 0L to ObjectStorageDestinationState.ObjectAndPart("old-finalized", 0L), - 0L to ObjectStorageDestinationState.ObjectAndPart("leftover-old-staging", 2L) - ), - toDelete, - "all old objects should be deleted" - ) - } - } - - @Nested - @MicronautTest( - environments = - [ - "ObjectStorageDestinationStateTest", - "MockObjectStorageClient", - "MockDestinationCatalog", - ], - ) - @Property(name = "object-storage-destination-state-test.use-staging", value = "false") - inner class ObjectStorageDestinationStateTestWithoutStaging { - suspend fun loadMetadata( - d: Dependencies, - stream: DestinationStream - ): List> { - val genIdKey = ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY - val prefix = - "${d.pathFactory.finalPrefix}/${stream.descriptor.namespace}/${stream.descriptor.name}" - val generations = - listOf( - Triple(0, "$prefix/key1-0", 0L), - Triple(0, "$prefix/key2-1", 1L), - Triple(1, "$prefix/key3-0", 0L), - Triple(1, "$prefix/key4-1", 1L) - ) - generations.forEach { (genId, key, _) -> - d.mockClient.streamingUpload( - key, - mapOf(genIdKey to genId.toString()), - NoopProcessor - ) { it.write(0) } - } - return generations - } - - fun validateMetadata( - state: ObjectStorageDestinationState, - generations: List> - ) = runTest { - Assertions.assertEquals( - generations - .groupBy { it.first } - .map { (generationId, triples) -> - ObjectStorageDestinationState.Generation( - false, - generationId.toLong(), - triples - .map { (_, key, partNumber) -> - ObjectStorageDestinationState.ObjectAndPart(key, partNumber) - } - .sortedByDescending { - // Brittle hack to get the order to line up - it.key.contains("key2") || it.key.contains("key4") - } - .toMutableList() - ) - }, - state.getGenerations().toList().sortedBy { it.generationId }, - "state should be recovered from metadata" - ) - } - - @Test - fun testRecoveringFromMetadata(d: Dependencies) = runTest { - val generations = loadMetadata(d, stream1) - val state = d.stateManager.getState(stream1) - validateMetadata(state, generations) - Assertions.assertEquals(2L, state.getNextPartNumber()) - } - } -} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateUTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateUTest.kt index d545c550a24fe..9d382fbdfe5b7 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateUTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateUTest.kt @@ -12,7 +12,10 @@ import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.mockk.coEvery import io.mockk.every import io.mockk.impl.annotations.MockK -import kotlinx.coroutines.flow.flowOf +import io.mockk.mockk +import java.util.concurrent.ConcurrentLinkedQueue +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -29,38 +32,141 @@ class ObjectStorageDestinationStateUTest { @BeforeEach fun setup() { every { stream.descriptor } returns DestinationStream.Descriptor("test", "stream") - every { pathFactory.getPathMatcher(any(), any()) } answers - { - val suffix = secondArg() - PathMatcher(Regex("([a-z]+)$suffix"), mapOf("suffix" to 2)) - } - every { pathFactory.getLongestStreamConstantPrefix(any(), any()) } returns "prefix/" + every { pathFactory.getLongestStreamConstantPrefix(any(), any()) } returns "" } @Test - fun `test that the fallback persister correctly infers the unique key to ordinal count`() = - runTest { - coEvery { client.list(any()) } returns - flowOf( + fun `test inferring unique key`() = runTest { + val mockObjects = + ConcurrentLinkedQueue( + listOf( MockObj("dog"), MockObj("dog-1"), MockObj("dog-3"), MockObj("cat"), - MockObj("turtle-100") + MockObj("turtle-1-100") + ) + ) + coEvery { client.list(any()) } answers + { + val prefix = firstArg() + mockObjects.asFlow().filter { it.key.startsWith(prefix) } + } + + every { pathFactory.getPathMatcher(any(), any()) } answers + { + val suffix = secondArg() + PathMatcher(Regex("(dog|cat|turtle-1)$suffix"), mapOf("suffix" to 2)) + } + + val persister = ObjectStorageFallbackPersister(client, pathFactory) + val state = persister.load(stream) + + assertEquals("dog-4", state.ensureUnique("dog")) + assertEquals("dog-5", state.ensureUnique("dog")) + assertEquals("cat-1", state.ensureUnique("cat")) + assertEquals("turtle-1-101", state.ensureUnique("turtle-1")) + assertEquals("turtle-1-102", state.ensureUnique("turtle-1")) + assertEquals("spider", state.ensureUnique("spider")) + } + + @Test + fun `test inferring part number`() = runTest { + val mockObjects = + ConcurrentLinkedQueue( + listOf( + MockObj("dog/file.0.jsonl"), + MockObj("dog/file.1.jsonl"), + MockObj("dog/file.2.jsonl"), + MockObj("cat/file.1.jsonl"), + MockObj("turtle-1/file.100.jsonl") + ) + ) + coEvery { client.list(any()) } answers + { + val prefix = firstArg() + mockObjects.asFlow().filter { it.key.startsWith(prefix) } + } + + every { pathFactory.getPathMatcher(any(), any()) } answers + { + val suffix = secondArg() + PathMatcher( + Regex("(dog|cat|turtle-1)/file\\.([0-9]+)\\.(jsonl)$suffix"), + mapOf("part_number" to 2, "suffix" to 4) ) - coEvery { client.getMetadata(any()) } returns mapOf("ab-generation-id" to "1") - - val persister = ObjectStorageFallbackPersister(client, pathFactory) - val state = persister.load(stream) - assertEquals(state.countByKey["dog"], 3L) - assertEquals(state.countByKey["cat"], 0L) - assertEquals(state.countByKey["turtle"], 100L) - - assertEquals(state.ensureUnique("dog"), "dog-4") - assertEquals(state.ensureUnique("dog"), "dog-5") - assertEquals(state.ensureUnique("cat"), "cat-1") - assertEquals(state.ensureUnique("turtle"), "turtle-101") - assertEquals(state.ensureUnique("turtle"), "turtle-102") - assertEquals(state.ensureUnique("spider"), "spider") - } + } + + val persister = ObjectStorageFallbackPersister(client, pathFactory) + val state = persister.load(stream) + + assertEquals(2L, state.getPartIdCounter("dog/").get()) + assertEquals(1L, state.getPartIdCounter("cat/").get()) + assertEquals(100L, state.getPartIdCounter("turtle-1/").get()) + assertEquals(-1L, state.getPartIdCounter("spider/").get()) + } + + @Test + fun `test get objects to delete`() = runTest { + val mockObjects = + ConcurrentLinkedQueue( + listOf( + MockObj("dog/1"), + MockObj("dog/2"), + MockObj("dog/3"), + MockObj("cat/1"), + MockObj("cat/2"), + MockObj("cat/3"), + MockObj("turtle-1/1"), + MockObj("turtle-1/2") + ) + ) + coEvery { client.list(any()) } answers + { + val prefix = firstArg() + mockObjects.asFlow().filter { it.key.startsWith(prefix) } + } + + every { pathFactory.getPathMatcher(any(), any()) } answers + { + val stream = firstArg() + val suffix = secondArg() + PathMatcher( + Regex("(${stream.descriptor.name})/([0-9]+)$suffix"), + mapOf("suffix" to 3) + ) + } + + coEvery { client.getMetadata(any()) } answers + { + val key = firstArg() + mapOf("ab-generation-id" to key.split("/").last()) + } + + val persister = ObjectStorageFallbackPersister(client, pathFactory) + + val dogStream = mockk(relaxed = true) + every { dogStream.descriptor } returns DestinationStream.Descriptor("test", "dog") + every { dogStream.minimumGenerationId } returns 0L + every { dogStream.shouldBeTruncatedAtEndOfSync() } returns true + val dogState = persister.load(dogStream) + assertEquals(0, dogState.getObjectsToDelete().size) + + val catStream = mockk(relaxed = true) + every { catStream.descriptor } returns DestinationStream.Descriptor("test", "cat") + every { catStream.minimumGenerationId } returns 3L + every { catStream.shouldBeTruncatedAtEndOfSync() } returns true + val catState = persister.load(catStream) + assertEquals( + setOf("cat/1", "cat/2"), + catState.getObjectsToDelete().map { it.second.key }.toSet() + ) + + val turtleStream = mockk(relaxed = true) + every { turtleStream.descriptor } returns DestinationStream.Descriptor("test", "turtle-1") + every { turtleStream.minimumGenerationId } returns 3L + every { turtleStream.shouldBeTruncatedAtEndOfSync() } returns false + val turtleState = persister.load(turtleStream) + assertEquals(0, turtleState.getObjectsToDelete().size) + } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt index 1712ff0c8e054..9b8868b9148d6 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt @@ -58,7 +58,7 @@ class RecordToPartAccumulatorTest { @Test fun `test parts are emitted correctly`() = runTest { - val fileNumber = AtomicLong(111L) + val fileNumber = AtomicLong(110L) val acc = RecordToPartAccumulator( pathFactory = pathFactory, diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 889be9228b3a2..fcf4034d094f1 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -58,11 +58,6 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store - - name: SECRET_DESTINATION-S3-V2-JSONL-STAGING - fileName: s3_dest_v2_jsonl_staging_config.json - secretStore: - type: GSM - alias: airbyte-connector-testing-secret-store - name: SECRET_DESTINATION-S3-V2-CSV fileName: s3_dest_v2_csv_config.json secretStore: diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index bcff936063234..ad9e00a6684cb 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -93,24 +93,6 @@ class S3V2WriteTestJsonRootLevelFlattening : allTypesBehavior = Untyped, ) -@Disabled("Un-disable once staging is re-enabled") -class S3V2WriteTestJsonStaging : - S3V2WriteTest( - S3V2TestUtils.JSON_STAGING_CONFIG_PATH, - UncoercedExpectedRecordMapper, - stringifySchemalessObjects = false, - unionBehavior = UnionBehavior.PASS_THROUGH, - schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH, - schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH, - preserveUndeclaredFields = true, - allTypesBehavior = Untyped, - commitDataIncrementally = false - ) { - @Test - @Disabled("Staging mode is not supported for file transfers") - override fun testBasicWriteFile() {} -} - class S3V2WriteTestJsonGzip : S3V2WriteTest( S3V2TestUtils.JSON_GZIP_CONFIG_PATH,