Skip to content

Commit

Permalink
Load CDK: Drop vestigial obj storage staging stuff (#54661)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Feb 25, 2025
1 parent 6146dcd commit 6196fb7
Show file tree
Hide file tree
Showing 16 changed files with 27 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ package io.airbyte.cdk.load.command.object_storage

data class ObjectStoragePathConfiguration(
val prefix: String,
val stagingPrefix: String?,
val pathSuffixPattern: String?,
val pathPattern: String?,
val fileNamePattern: String?,
val usesStagingDirectory: Boolean
)

interface ObjectStoragePathConfigurationProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,18 @@ import java.time.format.DateTimeFormatter
import java.util.*

interface PathFactory {
fun getLongestStreamConstantPrefix(stream: DestinationStream, isStaging: Boolean): String
fun getStagingDirectory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean = false
): String
fun getLongestStreamConstantPrefix(stream: DestinationStream): String
fun getFinalDirectory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean = false
): String
fun getPathToFile(
stream: DestinationStream,
partNumber: Long?,
isStaging: Boolean = false,
extension: String? = null
): String
fun getPathMatcher(stream: DestinationStream, suffixPattern: String? = null): PathMatcher

val supportsStaging: Boolean
val finalPrefix: String
}

Expand Down Expand Up @@ -84,21 +78,8 @@ class ObjectStoragePathFactory(
) : PathFactory {
// Resolved configuration
private val pathConfig = pathConfigProvider.objectStoragePathConfiguration
override val supportsStaging: Boolean = pathConfig.usesStagingDirectory

// Resolved bucket path prefixes
private val stagingPrefixResolved =
pathConfig.stagingPrefix
?: Paths.get(pathConfig.prefix, DEFAULT_STAGING_PREFIX_SUFFIX).toString()
private val stagingPrefix: String
get() =
if (!pathConfig.usesStagingDirectory) {
throw UnsupportedOperationException(
"Staging is not supported by this configuration"
)
} else {
stagingPrefixResolved
}

// Resolved bucket path prefix
override val finalPrefix: String =
if (pathConfig.prefix.endsWith('/')) {
pathConfig.prefix.take(pathConfig.prefix.length - 1)
Expand All @@ -107,7 +88,7 @@ class ObjectStoragePathFactory(
}

// Resolved path and filename patterns
private val pathPatternResolved = pathConfig.pathSuffixPattern ?: DEFAULT_PATH_FORMAT
private val pathPatternResolved = pathConfig.pathPattern ?: DEFAULT_PATH_FORMAT
private val filePatternResolved = pathConfig.fileNamePattern ?: DEFAULT_FILE_FORMAT

// Resolved file extensions
Expand Down Expand Up @@ -292,20 +273,6 @@ class ObjectStoragePathFactory(
}
}

override fun getStagingDirectory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
): String {
val path =
getFormattedPath(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES,
isStaging = true
)
return resolveRetainingTerminalSlash(path)
}

override fun getFinalDirectory(
stream: DestinationStream,
substituteStreamAndNamespaceOnly: Boolean
Expand All @@ -315,37 +282,25 @@ class ObjectStoragePathFactory(
stream,
if (substituteStreamAndNamespaceOnly) PATH_VARIABLES_STREAM_CONSTANT
else PATH_VARIABLES,
isStaging = false
)
return resolveRetainingTerminalSlash(path)
}

override fun getLongestStreamConstantPrefix(
stream: DestinationStream,
isStaging: Boolean
): String {
return if (isStaging) {
getStagingDirectory(stream, substituteStreamAndNamespaceOnly = true)
} else {
getFinalDirectory(stream, substituteStreamAndNamespaceOnly = true)
}
.takeWhile { it != '$' }
return getFinalDirectory(stream, substituteStreamAndNamespaceOnly = true).takeWhile {
it != '$'
}
}

override fun getPathToFile(
stream: DestinationStream,
partNumber: Long?,
isStaging: Boolean,
extension: String?
): String {
val extensionResolved = extension ?: defaultExtension
val path =
if (isStaging) {
getStagingDirectory(stream)
} else {
getFinalDirectory(stream)
}
.toString()
val path = getFinalDirectory(stream)
val context =
VariableContext(stream, extension = extensionResolved, partNumber = partNumber)
val fileName = getFormattedFileName(context)
Expand All @@ -357,10 +312,8 @@ class ObjectStoragePathFactory(
private fun getFormattedPath(
stream: DestinationStream,
variables: List<PathVariable> = PATH_VARIABLES,
isStaging: Boolean
): String {
val selectedPrefix = if (isStaging) stagingPrefix else finalPrefix
val pattern = resolveRetainingTerminalSlash(selectedPrefix, pathPatternResolved)
val pattern = resolveRetainingTerminalSlash(finalPrefix, pathPatternResolved)
val context = VariableContext(stream)
return variables.fold(pattern) { acc, variable -> variable.maybeApply(acc, context) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ObjectStorageDestinationState(
return emptyList()
}

val prefix = pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false)
val prefix = pathFactory.getLongestStreamConstantPrefix(stream)
log.info {
"Searching $prefix for objects to delete (minGenId=${stream.minimumGenerationId}; matcher=${matcher.regex})"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class RecordToPartAccumulator<U : OutputStream>(
pathFactory.getPathToFile(
stream,
fileNo,
isStaging = pathFactory.supportsStaging
)
),
fileNumber = fileNo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package io.airbyte.cdk.load.file.object_storage

import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.MockDestinationCatalogFactory
import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration
Expand All @@ -14,7 +12,6 @@ import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurati
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider
import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfigurationProvider
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.file.GZIPProcessor
import io.airbyte.cdk.load.file.MockTimeProvider
import io.airbyte.cdk.load.file.TimeProvider
Expand Down Expand Up @@ -57,12 +54,10 @@ class ObjectStoragePathFactoryTest {
override val objectStoragePathConfiguration: ObjectStoragePathConfiguration =
ObjectStoragePathConfiguration(
prefix = "prefix",
stagingPrefix = "staging/prefix",
pathSuffixPattern =
pathPattern =
"\${NAMESPACE}/\${STREAM_NAME}/\${YEAR}/\${MONTH}/\${DAY}/\${HOUR}/\${MINUTE}/\${SECOND}/\${MILLISECOND}/\${EPOCH}/",
fileNamePattern =
"{date}-{date:yyyy_MM}-{timestamp}-{part_number}-{sync_id}{format_extension}",
usesStagingDirectory = true
)
}

Expand All @@ -75,7 +70,7 @@ class ObjectStoragePathFactoryTest {
MockPathConfigProvider()
.objectStoragePathConfiguration
.copy(
pathSuffixPattern =
pathPattern =
"\${NAMESPACE}/\${STREAM_NAME}/\${YEAR}/\${MONTH}/\${DAY}/\${HOUR}/\${MINUTE}/\${SECOND}/\${MILLISECOND}/\${EPOCH}_"
)
}
Expand All @@ -86,9 +81,7 @@ class ObjectStoragePathFactoryTest {
@Requires(property = "object-storage-path-factory-test.use-staging", value = "false")
class MockPathConfigProviderWithoutStaging : ObjectStoragePathConfigurationProvider {
override val objectStoragePathConfiguration: ObjectStoragePathConfiguration =
MockPathConfigProvider()
.objectStoragePathConfiguration
.copy(usesStagingDirectory = false)
MockPathConfigProvider().objectStoragePathConfiguration
}

@Singleton
Expand All @@ -109,112 +102,6 @@ class ObjectStoragePathFactoryTest {
ObjectStorageCompressionConfiguration(compressor = GZIPProcessor)
}

@Nested
@MicronautTest(
environments =
[
"ObjectStoragePathFactoryTest",
"MockDestinationCatalog",
],
)
@Property(name = "object-storage-path-factory-test.use-staging", value = "true")
inner class ObjectStoragePathFactoryTestWithStaging {
@Test
fun testBasicBehavior(pathFactory: ObjectStoragePathFactory, timeProvider: TimeProvider) {
val syncTime = timeProvider.syncTimeMillis()
val wallTime = timeProvider.currentTimeMillis()
val stream1 = MockDestinationCatalogFactory.stream1
val (namespace, name) = stream1.descriptor
val prefixOnly = "prefix/$namespace/$name/2020/01/02/03/04/05/0678/$syncTime/"
val fileName = "2020_01_02-2020_01-$wallTime-173-42.jsonl.gz"
Assertions.assertEquals(
"staging/$prefixOnly",
pathFactory.getStagingDirectory(stream1).toString(),
)
Assertions.assertEquals(
prefixOnly,
pathFactory.getFinalDirectory(stream1).toString(),
)
Assertions.assertEquals(
"staging/$prefixOnly$fileName",
pathFactory.getPathToFile(stream1, 173, true).toString(),
)
Assertions.assertEquals(
"$prefixOnly$fileName",
pathFactory.getPathToFile(stream1, 173, false).toString(),
)
}

@Test
fun testPathMatchingPattern(
pathFactory: ObjectStoragePathFactory,
timeProvider: TimeProvider
) {
val syncTime = timeProvider.syncTimeMillis()
val stream1 = MockDestinationCatalogFactory.stream1
val (namespace, name) = stream1.descriptor
val expectedToMatch =
"prefix/$namespace/$name/2020/01/02/03/04/05/0678/$syncTime/2020_01_02-2020_01-1577934245678-173-42.jsonl.gz"
val match = pathFactory.getPathMatcher(stream1).match(expectedToMatch)
Assertions.assertTrue(match != null)
Assertions.assertTrue(match?.partNumber == 173L)
}

@Test
fun testPathMatchingPatternWithEmptyStream(
pathFactory: ObjectStoragePathFactory,
timeProvider: TimeProvider
) {
val epochMilli = timeProvider.currentTimeMillis()
val stream1 = MockDestinationCatalogFactory.stream1
val (_, name) = stream1.descriptor
val emptyNamespaceStream =
stream1.copy(descriptor = stream1.descriptor.copy(namespace = null))
val expectedToMatch =
"prefix/$name/2020/01/02/03/04/05/0678/$epochMilli/2020_01_02-2020_01-1577934245678-173-42.jsonl.gz"
val match = pathFactory.getPathMatcher(emptyNamespaceStream).match(expectedToMatch)
Assertions.assertTrue(match != null)
Assertions.assertTrue(match?.partNumber == 173L)
}

@Test
fun testSpecialCharacterInStream(
pathFactory: ObjectStoragePathFactory,
timeProvider: TimeProvider
) {
val epochMilli = timeProvider.syncTimeMillis()
val streamWithSpecial =
DestinationStream(
DestinationStream.Descriptor(
"namespace",
"stream_with:spécial:characters",
),
generationId = 0,
minimumGenerationId = 0,
syncId = 0,
schema = StringType,
importType = Append
)
val expectedPath =
"prefix/namespace/stream_with:special:characters/2020/01/02/03/04/05/0678/$epochMilli/"
Assertions.assertEquals(
expectedPath,
pathFactory.getFinalDirectory(streamWithSpecial),
)
}

@Test
fun testLongestConstantPrefix(pathFactory: ObjectStoragePathFactory) {
val stream1 = MockDestinationCatalogFactory.stream1
val (namespace, name) = stream1.descriptor
val prefixOnly = "prefix/$namespace/$name/"
Assertions.assertEquals(
prefixOnly,
pathFactory.getLongestStreamConstantPrefix(stream1, false)
)
}
}

@Nested
@MicronautTest(
environments =
Expand All @@ -239,15 +126,8 @@ class ObjectStoragePathFactoryTest {
)
Assertions.assertEquals(
"$prefixOnly$fileName",
pathFactory.getPathToFile(stream1, 173, false),
pathFactory.getPathToFile(stream1, 173),
)

Assertions.assertThrows(UnsupportedOperationException::class.java) {
pathFactory.getStagingDirectory(stream1)
}
Assertions.assertThrows(UnsupportedOperationException::class.java) {
pathFactory.getPathToFile(stream1, 173, true)
}
}
}

Expand Down Expand Up @@ -278,7 +158,7 @@ class ObjectStoragePathFactoryTest {
)
Assertions.assertEquals(
"$prefixOnly$fileName",
pathFactory.getPathToFile(stream1, 173, false),
pathFactory.getPathToFile(stream1, 173),
)
}
}
Expand Down
Loading

0 comments on commit 6196fb7

Please sign in to comment.