Skip to content

Commit

Permalink
Make efficient table discovery during read (#52556)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich authored Jan 29, 2025
1 parent ae01f28 commit d146b27
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 29 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.48.7 | 2025-01-26 | [\#51596](https://github.com/airbytehq/airbyte/pull/51596) | Make efficient table discovery during read |
| 0.48.6 | 2025-01-26 | [\#51596](https://github.com/airbytehq/airbyte/pull/51596) | Fix flaky source mssql tests |
| 0.48.5 | 2025-01-16 | [\#51583](https://github.com/airbytehq/airbyte/pull/51583) | Also save SSL key to /tmp in destination-postgres |
| 0.48.4 | 2024-12-24 | [\#50410](https://github.com/airbytehq/airbyte/pull/50410) | Save SSL key to /tmp |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.48.6
version=0.48.7
Original file line number Diff line number Diff line change
Expand Up @@ -389,31 +389,7 @@ abstract class AbstractJdbcSource<Datatype>(
)
}
.values
.map { fields: List<JsonNode> ->
TableInfo<CommonField<Datatype>>(
nameSpace = fields[0].get(INTERNAL_SCHEMA_NAME).asText(),
name = fields[0].get(INTERNAL_TABLE_NAME).asText(),
fields =
fields
// read the column metadata Json object, and determine its
// type
.map { f: JsonNode ->
val datatype = sourceOperations.getDatabaseFieldType(f)
val jsonType = getAirbyteType(datatype)
LOGGER.debug {
"Table ${fields[0].get(INTERNAL_TABLE_NAME).asText()} column ${f.get(INTERNAL_COLUMN_NAME).asText()}" +
"(type ${f.get(INTERNAL_COLUMN_TYPE_NAME).asText()}[${f.get(INTERNAL_COLUMN_SIZE).asInt()}], " +
"nullable ${f.get(INTERNAL_IS_NULLABLE).asBoolean()}) -> $jsonType"
}
object :
CommonField<Datatype>(
f.get(INTERNAL_COLUMN_NAME).asText(),
datatype
) {}
},
cursorFields = extractCursorFields(fields)
)
}
.map { fields: List<JsonNode> -> jsonFieldListToTableInfo(fields) }
}

private fun extractCursorFields(fields: List<JsonNode>): List<String> {
Expand Down Expand Up @@ -579,6 +555,53 @@ abstract class AbstractJdbcSource<Datatype>(
)
}

override fun discoverTable(
database: JdbcDatabase,
schema: String,
tableName: String
): TableInfo<CommonField<Datatype>>? {
LOGGER.info { "Discover table: $schema.$tableName" }
return database
.bufferedResultSetQuery<JsonNode>(
{ connection: Connection ->
connection.metaData.getColumns(getCatalog(database), schema, tableName, null)
},
{ resultSet: ResultSet -> this.getColumnMetadata(resultSet) }
)
.groupBy { t: JsonNode ->
ImmutablePair.of<String, String>(
t.get(INTERNAL_SCHEMA_NAME).asText(),
t.get(INTERNAL_TABLE_NAME).asText()
)
}
.values
.map { fields: List<JsonNode> -> jsonFieldListToTableInfo(fields) }
.firstOrNull()
}

private fun jsonFieldListToTableInfo(fields: List<JsonNode>): TableInfo<CommonField<Datatype>> {
return TableInfo<CommonField<Datatype>>(
nameSpace = fields[0].get(INTERNAL_SCHEMA_NAME).asText(),
name = fields[0].get(INTERNAL_TABLE_NAME).asText(),
fields =
fields
// read the column metadata Json object, and determine its
// type
.map { f: JsonNode ->
val datatype = sourceOperations.getDatabaseFieldType(f)
val jsonType = getAirbyteType(datatype)
LOGGER.debug {
"Table ${fields[0].get(INTERNAL_TABLE_NAME).asText()} column ${f.get(INTERNAL_COLUMN_NAME).asText()}" +
"(type ${f.get(INTERNAL_COLUMN_TYPE_NAME).asText()}[${f.get(INTERNAL_COLUMN_SIZE).asInt()}], " +
"nullable ${f.get(INTERNAL_IS_NULLABLE).asBoolean()}) -> $jsonType"
}
object :
CommonField<Datatype>(f.get(INTERNAL_COLUMN_NAME).asText(), datatype) {}
},
cursorFields = extractCursorFields(fields)
)
}

public override fun isCursorType(type: Datatype): Boolean {
return sourceOperations.isCursorType(type)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected constructor(driverClassName: String) :
logPreSyncDebugData(database, catalog)

val fullyQualifiedTableNameToInfo =
discoverWithoutSystemTables(database).associateBy {
discoverWithoutSystemTables(database, catalog).associateBy {
String.format("%s.%s", it.nameSpace, it.name)
}

Expand Down Expand Up @@ -289,6 +289,22 @@ protected constructor(driverClassName: String) :
/* no-op */
}

@Throws(Exception::class)
protected fun discoverWithoutSystemTables(
database: Database,
catalog: ConfiguredAirbyteCatalog,
): List<TableInfo<CommonField<DataType>>> {
var result = mutableListOf<TableInfo<CommonField<DataType>>>()
catalog.streams.forEach { airbyteStream: ConfiguredAirbyteStream ->
val stream = airbyteStream.stream
discoverTable(database, stream.namespace, stream.name)?.let {
LOGGER.info { "Discovered table: ${it.nameSpace}.${it.name}: $it" }
result.add(it)
}
}
return result
}

@Throws(Exception::class)
protected fun discoverWithoutSystemTables(
database: Database
Expand Down Expand Up @@ -723,6 +739,23 @@ protected constructor(driverClassName: String) :
tableInfos: List<TableInfo<CommonField<DataType>>>
): Map<String, MutableList<String>>

/**
* Discovers a table in the source database.
*
* @param database
* - source database
* @param schema
* - source schema
* @param tableName
* - source table name
* @return table information
*/
protected abstract fun discoverTable(
database: Database,
schema: String,
tableName: String
): TableInfo<CommonField<DataType>>?

protected abstract val quoteString: String?

/**
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.48.6'
cdkVersionRequired = '0.48.7'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.19
dockerImageTag: 4.1.20
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :---------------------------------------------------------------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.20 | 2025-01-26 | [52556](https://github.com/airbytehq/airbyte/pull/52556) | Improve tables discovery during read. |
| 4.1.19 | 2025-01-16 | [51596](https://github.com/airbytehq/airbyte/pull/51596) | Bump driver versions to latest (jdbc, debezium, cdk) |
| 4.1.18 | 2025-01-06 | [50943](https://github.com/airbytehq/airbyte/pull/50943) | Use airbyte/java-connector-base:2.0.0. This makes the image rootless. The connector will be incompatible with Airbyte < 0.64. |
| 4.1.17 | 2024-12-17 | [49840](https://github.com/airbytehq/airbyte/pull/49840) | Use a base image: airbyte/java-connector-base:1.0.0 |
Expand Down

0 comments on commit d146b27

Please sign in to comment.