From 75b1c0bd323de3e6004d604362f2522fd1741b5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9on=20Stefani?= Date: Thu, 2 Mar 2023 17:32:25 +0100 Subject: [PATCH] add three level namespace feature (#3) * add three level namespace feature * fix: allow compile --------- Co-authored-by: Dennis Hinnenkamp --- ...atabricksAzureBlobStorageStreamCopier.java | 22 +++++++++-- ...ksAzureBlobStorageStreamCopierFactory.java | 3 +- .../DatabricksDestinationConfig.java | 18 +++++++++ .../databricks/DatabricksS3StreamCopier.java | 14 +++++-- .../DatabricksS3StreamCopierFactory.java | 3 +- .../databricks/DatabricksSqlOperations.java | 4 ++ .../databricks/DatabricksStreamCopier.java | 37 +++++++++++++++---- .../src/main/resources/spec.json | 21 +++++++++-- 8 files changed, 104 insertions(+), 18 deletions(-) diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksAzureBlobStorageStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksAzureBlobStorageStreamCopier.java index 0184623ce4a11..acc6694e3c332 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksAzureBlobStorageStreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksAzureBlobStorageStreamCopier.java @@ -55,6 +55,7 @@ public class DatabricksAzureBlobStorageStreamCopier extends DatabricksStreamCopi protected String currentFile; public DatabricksAzureBlobStorageStreamCopier(final String stagingFolder, + final String catalog, final String schema, final ConfiguredAirbyteStream configuredStream, final JdbcDatabase database, @@ -63,7 +64,7 @@ public DatabricksAzureBlobStorageStreamCopier(final String stagingFolder, final SqlOperations sqlOperations, final SpecializedBlobClientBuilder specializedBlobClientBuilder, final AzureBlobStorageConfig azureConfig) { - super(stagingFolder, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations); + super(stagingFolder, catalog, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations); this.specializedBlobClientBuilder = specializedBlobClientBuilder; this.azureConfig = azureConfig; @@ -147,6 +148,13 @@ protected String getCreateTempTableStatement() { LOGGER.info("[Stream {}] tmp table schema: {}", stream.getName(), schemaString); + if (!useMetastore) { + return String.format("CREATE TABLE %s.%s.%s (%s) USING csv LOCATION '%s' " + + "options (\"header\" = \"true\", \"multiLine\" = \"true\") ;", + catalogName, schemaName, tmpTableName, schemaString, + getTmpTableLocation().replace(AZURE_BLOB_ENDPOINT_DOMAIN_NAME, AZURE_DFS_ENDPOINT_DOMAIN_NAME)); + } + return String.format("CREATE TABLE %s.%s (%s) USING csv LOCATION '%s' " + "options (\"header\" = \"true\", \"multiLine\" = \"true\") ;", schemaName, tmpTableName, schemaString, @@ -172,10 +180,18 @@ public String generateMergeStatement(final String destTableName) { LOGGER.info("Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.", tmpTableName, destTableName, schemaName); final var queries = new StringBuilder(); if (destinationSyncMode.equals(DestinationSyncMode.OVERWRITE)) { - queries.append(sqlOperations.truncateTableQuery(database, schemaName, destTableName)); + if (!useMetastore) { + queries.append(sqlOperations.truncateTableQuery(database, String.format("%s.%s", catalogName, schemaName), destTableName)); + } else { + queries.append(sqlOperations.truncateTableQuery(database, schemaName, destTableName)); + } LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated.", destTableName, schemaName); } - queries.append(sqlOperations.insertTableQuery(database, schemaName, tmpTableName, destTableName)); + if (!useMetastore) { + queries.append(sqlOperations.copyTableQuery(database, String.format("%s.%s", catalogName, schemaName), tmpTableName, destTableName)); + } else { + queries.append(sqlOperations.copyTableQuery(database, schemaName, tmpTableName, destTableName)); + } return queries.toString(); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksAzureBlobStorageStreamCopierFactory.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksAzureBlobStorageStreamCopierFactory.java index 6d3bb621f3621..5a83d21704dca 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksAzureBlobStorageStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksAzureBlobStorageStreamCopierFactory.java @@ -27,13 +27,14 @@ public StreamCopier create(final String configuredSchema, try { final AirbyteStream stream = configuredStream.getStream(); final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer); + final String catalog = databricksConfig.getDatabricksCatalog(); final AzureBlobStorageConfig azureConfig = databricksConfig.getStorageConfig().getAzureBlobStorageConfigOrThrow(); final SpecializedBlobClientBuilder specializedBlobClientBuilder = new SpecializedBlobClientBuilder() .endpoint(azureConfig.getEndpointUrl()) .sasToken(azureConfig.getSasToken()) .containerName(azureConfig.getContainerName()); - return new DatabricksAzureBlobStorageStreamCopier(stagingFolder, schema, configuredStream, database, + return new DatabricksAzureBlobStorageStreamCopier(stagingFolder, catalog, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations, specializedBlobClientBuilder, azureConfig); } catch (final Exception e) { throw new RuntimeException(e); diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java index 489796081a347..83d0c4f041e10 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java @@ -10,30 +10,38 @@ public class DatabricksDestinationConfig { static final String DEFAULT_DATABRICKS_PORT = "443"; + static final String DEFAULT_DATABRICKS_CATALOG = "airbyte"; static final String DEFAULT_DATABASE_SCHEMA = "public"; static final boolean DEFAULT_PURGE_STAGING_DATA = true; + static final boolean DEFAULT_USE_METASTORE = true; private final String databricksServerHostname; private final String databricksHttpPath; private final String databricksPort; private final String databricksPersonalAccessToken; + private final String databricksCatalog; private final String databaseSchema; private final boolean purgeStagingData; + private final boolean useMetastore; private final DatabricksStorageConfig storageConfig; public DatabricksDestinationConfig(final String databricksServerHostname, final String databricksHttpPath, final String databricksPort, final String databricksPersonalAccessToken, + final String databricksCatalog, final String databaseSchema, final boolean purgeStagingData, + final boolean useMetastore, DatabricksStorageConfig storageConfig) { this.databricksServerHostname = databricksServerHostname; this.databricksHttpPath = databricksHttpPath; this.databricksPort = databricksPort; this.databricksPersonalAccessToken = databricksPersonalAccessToken; + this.databricksCatalog = databricksCatalog; this.databaseSchema = databaseSchema; this.purgeStagingData = purgeStagingData; + this.useMetastore = useMetastore; this.storageConfig = storageConfig; } @@ -47,8 +55,10 @@ public static DatabricksDestinationConfig get(final JsonNode config) { config.get("databricks_http_path").asText(), config.has("databricks_port") ? config.get("databricks_port").asText() : DEFAULT_DATABRICKS_PORT, config.get("databricks_personal_access_token").asText(), + config.has("databricks_catalog") ? config.get("databricks_catalog").asText() : DEFAULT_DATABRICKS_CATALOG, config.has("database_schema") ? config.get("database_schema").asText() : DEFAULT_DATABASE_SCHEMA, config.has("purge_staging_data") ? config.get("purge_staging_data").asBoolean() : DEFAULT_PURGE_STAGING_DATA, + config.has("use_metastore") ? config.get("use_metastore").asBoolean() : DEFAULT_USE_METASTORE, DatabricksStorageConfig.getDatabricksStorageConfig(config.get("data_source"))); } @@ -68,6 +78,10 @@ public String getDatabricksPersonalAccessToken() { return databricksPersonalAccessToken; } + public String getDatabricksCatalog() { + return databricksCatalog; + } + public String getDatabaseSchema() { return databaseSchema; } @@ -76,6 +90,10 @@ public boolean isPurgeStagingData() { return purgeStagingData; } + public boolean isUseMetastore() { + return useMetastore; + } + public DatabricksStorageConfig getStorageConfig() { return storageConfig; } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksS3StreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksS3StreamCopier.java index 4605d81459d1e..104bb747fa0b4 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksS3StreamCopier.java @@ -50,6 +50,7 @@ public class DatabricksS3StreamCopier extends DatabricksStreamCopier { private final S3ParquetWriter parquetWriter; public DatabricksS3StreamCopier(final String stagingFolder, + final String catalog, final String schema, final ConfiguredAirbyteStream configuredStream, final AmazonS3 s3Client, @@ -60,7 +61,7 @@ public DatabricksS3StreamCopier(final String stagingFolder, final S3WriterFactory writerFactory, final Timestamp uploadTime) throws Exception { - super(stagingFolder, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations); + super(stagingFolder, catalog, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations); this.s3Client = s3Client; this.s3Config = databricksConfig.getStorageConfig().getS3DestinationConfigOrThrow(); final S3DestinationConfig stagingS3Config = getStagingS3DestinationConfig(s3Config, stagingFolder); @@ -103,17 +104,24 @@ protected String getDestTableLocation() { @Override protected String getCreateTempTableStatement() { + if (!useMetastore) { + return String.format("CREATE TABLE %s.%s.%s USING parquet LOCATION '%s';", catalogName, schemaName, tmpTableName, getTmpTableLocation()); + } return String.format("CREATE TABLE %s.%s USING parquet LOCATION '%s';", schemaName, tmpTableName, getTmpTableLocation()); } @Override public String generateMergeStatement(final String destTableName) { + String namespace = String.format("%s.%s", schemaName, destTableName); + if (!useMetastore) { + namespace = String.format("%s.%s.%s", catalogName, schemaName, destTableName); + } final String copyData = String.format( - "COPY INTO %s.%s " + + "COPY INTO %s " + "FROM '%s' " + "FILEFORMAT = PARQUET " + "PATTERN = '%s'", - schemaName, destTableName, + namespace, getTmpTableLocation(), parquetWriter.getOutputFilename()); LOGGER.info(copyData); diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksS3StreamCopierFactory.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksS3StreamCopierFactory.java index 1b85fa51f308a..ff52884162f6b 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksS3StreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksS3StreamCopierFactory.java @@ -29,12 +29,13 @@ public StreamCopier create(final String configuredSchema, try { final AirbyteStream stream = configuredStream.getStream(); final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer); + final String catalog = databricksConfig.getDatabricksCatalog(); S3DestinationConfig s3Config = databricksConfig.getStorageConfig().getS3DestinationConfigOrThrow(); final AmazonS3 s3Client = s3Config.getS3Client(); final ProductionWriterFactory writerFactory = new ProductionWriterFactory(); final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); - return new DatabricksS3StreamCopier(stagingFolder, schema, configuredStream, s3Client, database, + return new DatabricksS3StreamCopier(stagingFolder, catalog, schema, configuredStream, s3Client, database, databricksConfig, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); } catch (final Exception e) { throw new RuntimeException(e); diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java index 7c360d7d2a108..73eef6947af8a 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java @@ -38,6 +38,10 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN JavaBaseConstants.COLUMN_NAME_EMITTED_AT); } + public void createCatalogIfNotExists(final JdbcDatabase database, final String catalogName) throws Exception { + database.execute(String.format("create catalog if not exists %s;", catalogName)); + } + @Override public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception { database.execute(String.format("create database if not exists %s;", schemaName)); diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java index f525448e6d7be..90e7a0482ce3c 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -29,10 +29,12 @@ public abstract class DatabricksStreamCopier implements StreamCopier { private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStreamCopier.class); + protected final String catalogName; protected final String schemaName; protected final String streamName; protected final DestinationSyncMode destinationSyncMode; private final boolean purgeStagingData; + protected final boolean useMetastore; protected final JdbcDatabase database; protected final DatabricksSqlOperations sqlOperations; @@ -44,16 +46,19 @@ public abstract class DatabricksStreamCopier implements StreamCopier { protected final DatabricksDestinationConfig databricksConfig; public DatabricksStreamCopier(final String stagingFolder, + final String catalog, final String schema, final ConfiguredAirbyteStream configuredStream, final JdbcDatabase database, final DatabricksDestinationConfig databricksConfig, final ExtendedNameTransformer nameTransformer, final SqlOperations sqlOperations) { + this.catalogName = catalog; this.schemaName = schema; this.streamName = configuredStream.getStream().getName(); this.destinationSyncMode = configuredStream.getDestinationSyncMode(); this.purgeStagingData = databricksConfig.isPurgeStagingData(); + this.useMetastore = databricksConfig.isUseMetastore(); this.database = database; this.sqlOperations = (DatabricksSqlOperations) sqlOperations; @@ -75,8 +80,15 @@ public DatabricksStreamCopier(final String stagingFolder, @Override public void createDestinationSchema() throws Exception { - LOGGER.info("[Stream {}] Creating database schema if it does not exist: {}", streamName, schemaName); - sqlOperations.createSchemaIfNotExists(database, schemaName); + if (!useMetastore) { + LOGGER.info("[Stream {}] Creating databricks catalog if it does not exist: {}", streamName, catalogName); + sqlOperations.createCatalogIfNotExists(database, catalogName); + LOGGER.info("[Stream {}] Creating database schema if it does not exist: {}.{}", streamName, catalogName, schemaName); + sqlOperations.createSchemaIfNotExists(database, String.format("%s.%s", catalogName, schemaName)); + } else { + LOGGER.info("[Stream {}] Creating database schema if it does not exist: {}", streamName, schemaName); + sqlOperations.createSchemaIfNotExists(database, schemaName); + } } @Override @@ -108,21 +120,28 @@ public String createDestinationTable() throws Exception { ? "CREATE OR REPLACE TABLE" : "CREATE TABLE IF NOT EXISTS"; + String destNamespace = String.format("%s.%s", schemaName, destTableName); + String tmpNamespace = String.format("%s.%s", schemaName, tmpTableName); + if (!useMetastore) { + destNamespace = String.format("%s.%s.%s", catalogName, schemaName, destTableName); + tmpNamespace = String.format("%s.%s.%s", catalogName, schemaName, tmpTableName); + } + final String createTable = String.format( - "%s %s.%s " + + "%s %s " + "USING delta " + "LOCATION '%s' " + "COMMENT 'Created from stream %s' " + "TBLPROPERTIES ('airbyte.destinationSyncMode' = '%s', %s) " + // create the table based on the schema of the tmp table - "AS SELECT * FROM %s.%s LIMIT 0", + "AS SELECT * FROM %s LIMIT 0", createStatement, - schemaName, destTableName, + destNamespace, getDestTableLocation(), streamName, destinationSyncMode.value(), String.join(", ", DatabricksConstants.DEFAULT_TBL_PROPERTIES), - schemaName, tmpTableName); + tmpNamespace); LOGGER.info(createTable); database.execute(createTable); @@ -133,7 +152,11 @@ public String createDestinationTable() throws Exception { public void removeFileAndDropTmpTable() throws Exception { if (purgeStagingData) { LOGGER.info("[Stream {}] Deleting tmp table: {}", streamName, tmpTableName); - sqlOperations.dropTableIfExists(database, schemaName, tmpTableName); + if (!useMetastore) { + sqlOperations.dropTableIfExists(database, String.format("%s.%s", catalogName, schemaName), tmpTableName); + } else { + sqlOperations.dropTableIfExists(database, schemaName, tmpTableName); + } deleteStagingFile(); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json index 55e0e4fe71a73..91e69a584463c 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json @@ -53,13 +53,21 @@ "airbyte_secret": true, "order": 5 }, + "databricks_catalog": { + "title": "Databricks Catalog", + "type": "string", + "description": "The default catalog tables are written to.", + "default": "airbyte", + "examples": ["airbyte"], + "order": 6 + }, "database_schema": { "title": "Database Schema", "type": "string", "description": "The default schema tables are written to if the source does not specify a namespace. Unless specifically configured, the usual value for this field is \"public\".", "default": "public", "examples": ["public"], - "order": 6 + "order": 7 }, "data_source": { "title": "Data Source", @@ -213,14 +221,21 @@ } } ], - "order": 7 + "order": 8 }, "purge_staging_data": { "title": "Purge Staging Files and Tables", "type": "boolean", "description": "Default to 'true'. Switch it to 'false' for debugging purpose.", "default": true, - "order": 8 + "order": 9 + }, + "use_metastore": { + "title": "Use Metastore as Catalog", + "type": "boolean", + "description": "Default to 'true'. Switch it to 'false' using three level namespace and create new catalog (this needs Unity Catalog to be activated).", + "default": true, + "order": 10 } } }