Skip to content

Commit

Permalink
use table cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Dec 17, 2024
1 parent 6204548 commit ea13d27
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package io.airbyte.integrations.destination.iceberg.v2
import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.isNamespaceOld
import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceRegex
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.SupportsNamespaces
Expand All @@ -18,9 +20,14 @@ class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationClean
val namespace = it.level(0)
randomizedNamespaceRegex.matches(namespace) && isNamespaceOld(namespace)
}

// we're passing explicit TableIdentifier to clearTable, so just use SimpleTableIdGenerator
val tableCleaner = IcebergTableCleaner(IcebergUtil(SimpleTableIdGenerator()))

namespaces.forEach { namespace ->
catalog.listTables(namespace).forEach { table ->
catalog.dropTable(table, /* purge = */ true)
catalog.listTables(namespace).forEach { tableId ->
val table = catalog.loadTable(tableId)
tableCleaner.clearTable(catalog, tableId, table.io(), table.location())
}
catalog.dropNamespace(namespace)
}
Expand Down

0 comments on commit ea13d27

Please sign in to comment.