Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination S3 Data Lake: extract AWS-specific pieces; move generic stuff to toolkit #53697

Merged
merged 3 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class S3DataLakeChecker(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
)
s3DataLakeUtil.createNamespaceWithGlueHandling(testTableIdentifier, catalog)
val table =
s3DataLakeUtil.createTable(
testTableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class S3DataLakeStreamLoader(
override suspend fun start() {
val properties = s3DataLakeUtil.toCatalogProperties(config = icebergConfiguration)
val catalog = s3DataLakeUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
s3DataLakeUtil.createNamespaceWithGlueHandling(stream.descriptor, catalog)
table =
s3DataLakeUtil.createTable(
streamDescriptor = stream.descriptor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,8 @@ class S3DataLakeUtil(
return CatalogUtil.buildIcebergCatalog(catalogName, properties, Configuration())
}

/**
* Builds (if necessary) an Iceberg [Table]. This includes creating the table's namespace if it
* does not already exist. If the [Table] already exists, it is loaded from the [Catalog].
*
* @param streamDescriptor The [DestinationStream.Descriptor] that contains the Airbyte stream's
* namespace and name.
* @param catalog The Iceberg [Catalog] that contains the [Table] or should contain it once
* created.
* @param schema The Iceberg [Schema] associated with the [Table].
* @param properties The [Table] configuration properties derived from the [Catalog].
* @return The Iceberg [Table], created if it does not yet exist.
*/
fun createTable(
streamDescriptor: DestinationStream.Descriptor,
catalog: Catalog,
schema: Schema,
properties: Map<String, String>
): Table {
/** Create the namespace if it doesn't already exist. */
fun createNamespace(streamDescriptor: DestinationStream.Descriptor, catalog: Catalog) {
val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor)
synchronized(tableIdentifier.namespace()) {
if (
Expand All @@ -145,15 +129,46 @@ class S3DataLakeUtil(
logger.info {
"Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations."
}
} catch (e: ConcurrentModificationException) {
// do the same for AWS Glue
logger.info {
"Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations."
}
}
}
}
}

fun createNamespaceWithGlueHandling(
streamDescriptor: DestinationStream.Descriptor,
catalog: Catalog
) {
try {
createNamespace(streamDescriptor, catalog)
} catch (e: ConcurrentModificationException) {
// glue catalog throws its own special exception
logger.info {
"Namespace '${streamDescriptor.namespace}' was likely created by another thread during parallel operations."
}
}
}

/**
* Builds (if necessary) an Iceberg [Table]. If the [Table] already exists, it is loaded from
* the [Catalog].
*
* Assumes the namespace already exists. Use [createNamespace] if this is not guaranteed.
*
* @param streamDescriptor The [DestinationStream.Descriptor] that contains the Airbyte stream's
* namespace and name.
* @param catalog The Iceberg [Catalog] that contains the [Table] or should contain it once
* created.
* @param schema The Iceberg [Schema] associated with the [Table].
* @param properties The [Table] configuration properties derived from the [Catalog].
* @return The Iceberg [Table], created if it does not yet exist.
*/
fun createTable(
streamDescriptor: DestinationStream.Descriptor,
catalog: Catalog,
schema: Schema,
properties: Map<String, String>
): Table {
val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor)
return if (!catalog.tableExists(tableIdentifier)) {
logger.info { "Creating Iceberg table '$tableIdentifier'...." }
catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ internal class S3DataLakeStreamLoaderTest {
every { table.manageSnapshots().createBranch(any()).commit() } just runs
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createNamespaceWithGlueHandling(any(), any()) } just runs
every { createTable(any(), any(), any(), any()) } returns table
every { toCatalogProperties(any()) } returns mapOf()
every { toIcebergSchema(any(), any<MapperPipeline>()) } answers
Expand Down Expand Up @@ -222,6 +223,7 @@ internal class S3DataLakeStreamLoaderTest {
every { table.newScan().planFiles() } returns CloseableIterable.empty()
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createNamespaceWithGlueHandling(any(), any()) } just runs
every { createTable(any(), any(), any(), any()) } returns table
every { toCatalogProperties(any()) } returns mapOf()
every { toIcebergSchema(any(), any<MapperPipeline>()) } answers
Expand Down Expand Up @@ -370,6 +372,7 @@ internal class S3DataLakeStreamLoaderTest {
every { table.newScan().planFiles() } returns CloseableIterable.empty()
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createNamespaceWithGlueHandling(any(), any()) } just runs
every { createTable(any(), any(), any(), any()) } returns table
every { toCatalogProperties(any()) } returns mapOf()
every { toIcebergSchema(any(), any<MapperPipeline>()) } answers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ internal class S3DataLakeUtilTest {
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns
false
}
s3DataLakeUtil.createNamespaceWithGlueHandling(streamDescriptor, catalog)
val table =
s3DataLakeUtil.createTable(
streamDescriptor = streamDescriptor,
Expand Down Expand Up @@ -134,6 +135,7 @@ internal class S3DataLakeUtilTest {
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns
false
}
s3DataLakeUtil.createNamespaceWithGlueHandling(streamDescriptor, catalog)
val table =
s3DataLakeUtil.createTable(
streamDescriptor = streamDescriptor,
Expand Down Expand Up @@ -161,6 +163,7 @@ internal class S3DataLakeUtilTest {
every { namespaceExists(any()) } returns true
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns true
}
s3DataLakeUtil.createNamespaceWithGlueHandling(streamDescriptor, catalog)
val table =
s3DataLakeUtil.createTable(
streamDescriptor = streamDescriptor,
Expand Down
Loading