Skip to content

Commit

Permalink
add three level namespace feature (#3)
Browse files Browse the repository at this point in the history
* add three level namespace feature

* fix: allow compile

---------

Co-authored-by: Dennis Hinnenkamp <[email protected]>
  • Loading branch information
shrodingers and mcfuhrt authored Mar 2, 2023
1 parent 8785691 commit 75b1c0b
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class DatabricksAzureBlobStorageStreamCopier extends DatabricksStreamCopi
protected String currentFile;

public DatabricksAzureBlobStorageStreamCopier(final String stagingFolder,
final String catalog,
final String schema,
final ConfiguredAirbyteStream configuredStream,
final JdbcDatabase database,
Expand All @@ -63,7 +64,7 @@ public DatabricksAzureBlobStorageStreamCopier(final String stagingFolder,
final SqlOperations sqlOperations,
final SpecializedBlobClientBuilder specializedBlobClientBuilder,
final AzureBlobStorageConfig azureConfig) {
super(stagingFolder, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations);
super(stagingFolder, catalog, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations);

this.specializedBlobClientBuilder = specializedBlobClientBuilder;
this.azureConfig = azureConfig;
Expand Down Expand Up @@ -147,6 +148,13 @@ protected String getCreateTempTableStatement() {

LOGGER.info("[Stream {}] tmp table schema: {}", stream.getName(), schemaString);

if (!useMetastore) {
return String.format("CREATE TABLE %s.%s.%s (%s) USING csv LOCATION '%s' " +
"options (\"header\" = \"true\", \"multiLine\" = \"true\") ;",
catalogName, schemaName, tmpTableName, schemaString,
getTmpTableLocation().replace(AZURE_BLOB_ENDPOINT_DOMAIN_NAME, AZURE_DFS_ENDPOINT_DOMAIN_NAME));
}

return String.format("CREATE TABLE %s.%s (%s) USING csv LOCATION '%s' " +
"options (\"header\" = \"true\", \"multiLine\" = \"true\") ;",
schemaName, tmpTableName, schemaString,
Expand All @@ -172,10 +180,18 @@ public String generateMergeStatement(final String destTableName) {
LOGGER.info("Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.", tmpTableName, destTableName, schemaName);
final var queries = new StringBuilder();
if (destinationSyncMode.equals(DestinationSyncMode.OVERWRITE)) {
queries.append(sqlOperations.truncateTableQuery(database, schemaName, destTableName));
if (!useMetastore) {
queries.append(sqlOperations.truncateTableQuery(database, String.format("%s.%s", catalogName, schemaName), destTableName));
} else {
queries.append(sqlOperations.truncateTableQuery(database, schemaName, destTableName));
}
LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated.", destTableName, schemaName);
}
queries.append(sqlOperations.insertTableQuery(database, schemaName, tmpTableName, destTableName));
if (!useMetastore) {
queries.append(sqlOperations.copyTableQuery(database, String.format("%s.%s", catalogName, schemaName), tmpTableName, destTableName));
} else {
queries.append(sqlOperations.copyTableQuery(database, schemaName, tmpTableName, destTableName));
}

return queries.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ public StreamCopier create(final String configuredSchema,
try {
final AirbyteStream stream = configuredStream.getStream();
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
final String catalog = databricksConfig.getDatabricksCatalog();

final AzureBlobStorageConfig azureConfig = databricksConfig.getStorageConfig().getAzureBlobStorageConfigOrThrow();
final SpecializedBlobClientBuilder specializedBlobClientBuilder = new SpecializedBlobClientBuilder()
.endpoint(azureConfig.getEndpointUrl())
.sasToken(azureConfig.getSasToken())
.containerName(azureConfig.getContainerName());
return new DatabricksAzureBlobStorageStreamCopier(stagingFolder, schema, configuredStream, database,
return new DatabricksAzureBlobStorageStreamCopier(stagingFolder, catalog, schema, configuredStream, database,
databricksConfig, nameTransformer, sqlOperations, specializedBlobClientBuilder, azureConfig);
} catch (final Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,38 @@
public class DatabricksDestinationConfig {

static final String DEFAULT_DATABRICKS_PORT = "443";
static final String DEFAULT_DATABRICKS_CATALOG = "airbyte";
static final String DEFAULT_DATABASE_SCHEMA = "public";
static final boolean DEFAULT_PURGE_STAGING_DATA = true;
static final boolean DEFAULT_USE_METASTORE = true;

private final String databricksServerHostname;
private final String databricksHttpPath;
private final String databricksPort;
private final String databricksPersonalAccessToken;
private final String databricksCatalog;
private final String databaseSchema;
private final boolean purgeStagingData;
private final boolean useMetastore;
private final DatabricksStorageConfig storageConfig;

public DatabricksDestinationConfig(final String databricksServerHostname,
final String databricksHttpPath,
final String databricksPort,
final String databricksPersonalAccessToken,
final String databricksCatalog,
final String databaseSchema,
final boolean purgeStagingData,
final boolean useMetastore,
DatabricksStorageConfig storageConfig) {
this.databricksServerHostname = databricksServerHostname;
this.databricksHttpPath = databricksHttpPath;
this.databricksPort = databricksPort;
this.databricksPersonalAccessToken = databricksPersonalAccessToken;
this.databricksCatalog = databricksCatalog;
this.databaseSchema = databaseSchema;
this.purgeStagingData = purgeStagingData;
this.useMetastore = useMetastore;
this.storageConfig = storageConfig;
}

Expand All @@ -47,8 +55,10 @@ public static DatabricksDestinationConfig get(final JsonNode config) {
config.get("databricks_http_path").asText(),
config.has("databricks_port") ? config.get("databricks_port").asText() : DEFAULT_DATABRICKS_PORT,
config.get("databricks_personal_access_token").asText(),
config.has("databricks_catalog") ? config.get("databricks_catalog").asText() : DEFAULT_DATABRICKS_CATALOG,
config.has("database_schema") ? config.get("database_schema").asText() : DEFAULT_DATABASE_SCHEMA,
config.has("purge_staging_data") ? config.get("purge_staging_data").asBoolean() : DEFAULT_PURGE_STAGING_DATA,
config.has("use_metastore") ? config.get("use_metastore").asBoolean() : DEFAULT_USE_METASTORE,
DatabricksStorageConfig.getDatabricksStorageConfig(config.get("data_source")));
}

Expand All @@ -68,6 +78,10 @@ public String getDatabricksPersonalAccessToken() {
return databricksPersonalAccessToken;
}

public String getDatabricksCatalog() {
return databricksCatalog;
}

public String getDatabaseSchema() {
return databaseSchema;
}
Expand All @@ -76,6 +90,10 @@ public boolean isPurgeStagingData() {
return purgeStagingData;
}

public boolean isUseMetastore() {
return useMetastore;
}

public DatabricksStorageConfig getStorageConfig() {
return storageConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class DatabricksS3StreamCopier extends DatabricksStreamCopier {
private final S3ParquetWriter parquetWriter;

public DatabricksS3StreamCopier(final String stagingFolder,
final String catalog,
final String schema,
final ConfiguredAirbyteStream configuredStream,
final AmazonS3 s3Client,
Expand All @@ -60,7 +61,7 @@ public DatabricksS3StreamCopier(final String stagingFolder,
final S3WriterFactory writerFactory,
final Timestamp uploadTime)
throws Exception {
super(stagingFolder, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations);
super(stagingFolder, catalog, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations);
this.s3Client = s3Client;
this.s3Config = databricksConfig.getStorageConfig().getS3DestinationConfigOrThrow();
final S3DestinationConfig stagingS3Config = getStagingS3DestinationConfig(s3Config, stagingFolder);
Expand Down Expand Up @@ -103,17 +104,24 @@ protected String getDestTableLocation() {

@Override
protected String getCreateTempTableStatement() {
if (!useMetastore) {
return String.format("CREATE TABLE %s.%s.%s USING parquet LOCATION '%s';", catalogName, schemaName, tmpTableName, getTmpTableLocation());
}
return String.format("CREATE TABLE %s.%s USING parquet LOCATION '%s';", schemaName, tmpTableName, getTmpTableLocation());
}

@Override
public String generateMergeStatement(final String destTableName) {
String namespace = String.format("%s.%s", schemaName, destTableName);
if (!useMetastore) {
namespace = String.format("%s.%s.%s", catalogName, schemaName, destTableName);
}
final String copyData = String.format(
"COPY INTO %s.%s " +
"COPY INTO %s " +
"FROM '%s' " +
"FILEFORMAT = PARQUET " +
"PATTERN = '%s'",
schemaName, destTableName,
namespace,
getTmpTableLocation(),
parquetWriter.getOutputFilename());
LOGGER.info(copyData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ public StreamCopier create(final String configuredSchema,
try {
final AirbyteStream stream = configuredStream.getStream();
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
final String catalog = databricksConfig.getDatabricksCatalog();

S3DestinationConfig s3Config = databricksConfig.getStorageConfig().getS3DestinationConfigOrThrow();
final AmazonS3 s3Client = s3Config.getS3Client();
final ProductionWriterFactory writerFactory = new ProductionWriterFactory();
final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());
return new DatabricksS3StreamCopier(stagingFolder, schema, configuredStream, s3Client, database,
return new DatabricksS3StreamCopier(stagingFolder, catalog, schema, configuredStream, s3Client, database,
databricksConfig, nameTransformer, sqlOperations, writerFactory, uploadTimestamp);
} catch (final Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
}

public void createCatalogIfNotExists(final JdbcDatabase database, final String catalogName) throws Exception {
database.execute(String.format("create catalog if not exists %s;", catalogName));
}

@Override
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
database.execute(String.format("create database if not exists %s;", schemaName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ public abstract class DatabricksStreamCopier implements StreamCopier {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStreamCopier.class);

protected final String catalogName;
protected final String schemaName;
protected final String streamName;
protected final DestinationSyncMode destinationSyncMode;
private final boolean purgeStagingData;
protected final boolean useMetastore;
protected final JdbcDatabase database;
protected final DatabricksSqlOperations sqlOperations;

Expand All @@ -44,16 +46,19 @@ public abstract class DatabricksStreamCopier implements StreamCopier {
protected final DatabricksDestinationConfig databricksConfig;

public DatabricksStreamCopier(final String stagingFolder,
final String catalog,
final String schema,
final ConfiguredAirbyteStream configuredStream,
final JdbcDatabase database,
final DatabricksDestinationConfig databricksConfig,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
this.catalogName = catalog;
this.schemaName = schema;
this.streamName = configuredStream.getStream().getName();
this.destinationSyncMode = configuredStream.getDestinationSyncMode();
this.purgeStagingData = databricksConfig.isPurgeStagingData();
this.useMetastore = databricksConfig.isUseMetastore();
this.database = database;
this.sqlOperations = (DatabricksSqlOperations) sqlOperations;

Expand All @@ -75,8 +80,15 @@ public DatabricksStreamCopier(final String stagingFolder,

@Override
public void createDestinationSchema() throws Exception {
LOGGER.info("[Stream {}] Creating database schema if it does not exist: {}", streamName, schemaName);
sqlOperations.createSchemaIfNotExists(database, schemaName);
if (!useMetastore) {
LOGGER.info("[Stream {}] Creating databricks catalog if it does not exist: {}", streamName, catalogName);
sqlOperations.createCatalogIfNotExists(database, catalogName);
LOGGER.info("[Stream {}] Creating database schema if it does not exist: {}.{}", streamName, catalogName, schemaName);
sqlOperations.createSchemaIfNotExists(database, String.format("%s.%s", catalogName, schemaName));
} else {
LOGGER.info("[Stream {}] Creating database schema if it does not exist: {}", streamName, schemaName);
sqlOperations.createSchemaIfNotExists(database, schemaName);
}
}

@Override
Expand Down Expand Up @@ -108,21 +120,28 @@ public String createDestinationTable() throws Exception {
? "CREATE OR REPLACE TABLE"
: "CREATE TABLE IF NOT EXISTS";

String destNamespace = String.format("%s.%s", schemaName, destTableName);
String tmpNamespace = String.format("%s.%s", schemaName, tmpTableName);
if (!useMetastore) {
destNamespace = String.format("%s.%s.%s", catalogName, schemaName, destTableName);
tmpNamespace = String.format("%s.%s.%s", catalogName, schemaName, tmpTableName);
}

final String createTable = String.format(
"%s %s.%s " +
"%s %s " +
"USING delta " +
"LOCATION '%s' " +
"COMMENT 'Created from stream %s' " +
"TBLPROPERTIES ('airbyte.destinationSyncMode' = '%s', %s) " +
// create the table based on the schema of the tmp table
"AS SELECT * FROM %s.%s LIMIT 0",
"AS SELECT * FROM %s LIMIT 0",
createStatement,
schemaName, destTableName,
destNamespace,
getDestTableLocation(),
streamName,
destinationSyncMode.value(),
String.join(", ", DatabricksConstants.DEFAULT_TBL_PROPERTIES),
schemaName, tmpTableName);
tmpNamespace);
LOGGER.info(createTable);
database.execute(createTable);

Expand All @@ -133,7 +152,11 @@ public String createDestinationTable() throws Exception {
public void removeFileAndDropTmpTable() throws Exception {
if (purgeStagingData) {
LOGGER.info("[Stream {}] Deleting tmp table: {}", streamName, tmpTableName);
sqlOperations.dropTableIfExists(database, schemaName, tmpTableName);
if (!useMetastore) {
sqlOperations.dropTableIfExists(database, String.format("%s.%s", catalogName, schemaName), tmpTableName);
} else {
sqlOperations.dropTableIfExists(database, schemaName, tmpTableName);
}

deleteStagingFile();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,21 @@
"airbyte_secret": true,
"order": 5
},
"databricks_catalog": {
"title": "Databricks Catalog",
"type": "string",
"description": "The default catalog tables are written to.",
"default": "airbyte",
"examples": ["airbyte"],
"order": 6
},
"database_schema": {
"title": "Database Schema",
"type": "string",
"description": "The default schema tables are written to if the source does not specify a namespace. Unless specifically configured, the usual value for this field is \"public\".",
"default": "public",
"examples": ["public"],
"order": 6
"order": 7
},
"data_source": {
"title": "Data Source",
Expand Down Expand Up @@ -213,14 +221,21 @@
}
}
],
"order": 7
"order": 8
},
"purge_staging_data": {
"title": "Purge Staging Files and Tables",
"type": "boolean",
"description": "Default to 'true'. Switch it to 'false' for debugging purpose.",
"default": true,
"order": 8
"order": 9
},
"use_metastore": {
"title": "Use Metastore as Catalog",
"type": "boolean",
"description": "Default to 'true'. Switch it to 'false' using three level namespace and create new catalog (this needs Unity Catalog to be activated).",
"default": true,
"order": 10
}
}
}
Expand Down

0 comments on commit 75b1c0b

Please sign in to comment.