Skip to content

Commit

Permalink
🎉 Migrate config persistence to database (#4670)
Browse files Browse the repository at this point in the history
* Implement db config persistence

* Fix database readiness check

* Reduce logging noise

* Setup config database in config persistence factory

* Update documentation

* Load seed from yaml files

* Refactor config persistence factory

* Add one more test to mimic migration

* Remove unnecessary changes

* Run code formatter

* Update placeholder env values

* Set default config database parameters in docker compose

Co-authored-by: Christophe Duong <[email protected]>

* Default setupDatabase to false

* Rename variable

* Set default config db parameters for server

* Remove config db parameters from the env file

* Remove unnecessary environment statements

* Hide config persistence factory (#4772)

* Remove CONFIG_DATABASE_HOST

* Use builder in the test

* Simplify config persistence builder

* Clarify config db connection readiness

* Format code

* Add logging

* Fix typo

Co-authored-by: Christophe Duong <[email protected]>

* Add a config_id only index

* Reuse record insertion code

* Add id field name to config schema

* Support data loading from legacy config schemas

* Log missing logs in migration test

* Move airbyte configs table to separate directory

* Update exception message

* Dump specific tables from the job database

* Remove postgres specific uuid extension

* Comment out future branch

* Default configs db variables to empty

When defaulting them to the jobs db variables, it somehow does not work.

* Log inserted config records

* Log all db write operations

* Add back config db variables in env file to mute warnings

* Log connection exception to debug flaky e2e test

* Leave config db variables empty

`.env` file does not support variable expansion.

Co-authored-by: Christophe Duong <[email protected]>
Co-authored-by: Charles <[email protected]>
  • Loading branch information
3 people authored Jul 19, 2021
1 parent f4018c5 commit e577b49
Show file tree
Hide file tree
Showing 30 changed files with 1,521 additions and 100 deletions.
8 changes: 7 additions & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
VERSION=0.27.3-alpha

# Airbyte Internal Database, see https://docs.airbyte.io/operator-guides/configuring-airbyte-db
# Airbyte Internal Job Database, see https://docs.airbyte.io/operator-guides/configuring-airbyte-db
DATABASE_USER=docker
DATABASE_PASSWORD=docker
DATABASE_HOST=db
Expand All @@ -9,6 +9,12 @@ DATABASE_DB=airbyte
# translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT/${DATABASE_DB}
DATABASE_URL=jdbc:postgresql://db:5432/airbyte

# Airbyte Internal Config Database, default to reuse the Job Database when they are empty
# Usually you do not need to set them; they are explicitly left empty to mute docker compose warnings
CONFIG_DATABASE_USER=
CONFIG_DATABASE_PASSWORD=
CONFIG_DATABASE_URL=

# When using the airbyte-db via default docker image:
CONFIG_ROOT=/data
DATA_DOCKER_MOUNT=airbyte_data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- workspaceId: 5ae6b09b-fdec-41af-aaf7-7d94cfc33ef6
name: default
slug: default
initialSetupComplete: false
displaySetupWizard: true
tombstone: false
Original file line number Diff line number Diff line change
Expand Up @@ -32,82 +32,75 @@
public enum ConfigSchema {

// workspace
STANDARD_WORKSPACE("StandardWorkspace.yaml", StandardWorkspace.class, standardWorkspace -> {
return standardWorkspace.getWorkspaceId().toString();
}),
STANDARD_WORKSPACE("StandardWorkspace.yaml",
StandardWorkspace.class,
standardWorkspace -> standardWorkspace.getWorkspaceId().toString(),
"workspaceId"),

// source
STANDARD_SOURCE_DEFINITION("StandardSourceDefinition.yaml", StandardSourceDefinition.class,
standardSourceDefinition -> {
return standardSourceDefinition.getSourceDefinitionId().toString();
}),
SOURCE_CONNECTION("SourceConnection.yaml", SourceConnection.class,
sourceConnection -> {
return sourceConnection.getSourceId().toString();
}),
STANDARD_SOURCE_DEFINITION("StandardSourceDefinition.yaml",
StandardSourceDefinition.class,
standardSourceDefinition -> standardSourceDefinition.getSourceDefinitionId().toString(),
"sourceDefinitionId"),
SOURCE_CONNECTION("SourceConnection.yaml",
SourceConnection.class,
sourceConnection -> sourceConnection.getSourceId().toString(),
"sourceId"),

// destination
STANDARD_DESTINATION_DEFINITION("StandardDestinationDefinition.yaml",
StandardDestinationDefinition.class, standardDestinationDefinition -> {
return standardDestinationDefinition.getDestinationDefinitionId().toString();
}),
DESTINATION_CONNECTION("DestinationConnection.yaml", DestinationConnection.class,
destinationConnection -> {
return destinationConnection.getDestinationId().toString();
}),
StandardDestinationDefinition.class,
standardDestinationDefinition -> standardDestinationDefinition.getDestinationDefinitionId().toString(),
"destinationDefinitionId"),
DESTINATION_CONNECTION("DestinationConnection.yaml",
DestinationConnection.class,
destinationConnection -> destinationConnection.getDestinationId().toString(),
"destinationId"),

// sync
STANDARD_SYNC("StandardSync.yaml", StandardSync.class, standardSync -> {
return standardSync.getConnectionId().toString();
}),
STANDARD_SYNC_OPERATION("StandardSyncOperation.yaml", StandardSyncOperation.class,
standardSyncOperation -> {
return standardSyncOperation.getOperationId().toString();
}),
STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class,
standardSyncSummary -> {
throw new RuntimeException("StandardSyncSummary doesn't have an id");
}),
STANDARD_SYNC("StandardSync.yaml",
StandardSync.class,
standardSync -> standardSync.getConnectionId().toString(),
"connectionId"),
STANDARD_SYNC_OPERATION("StandardSyncOperation.yaml",
StandardSyncOperation.class,
standardSyncOperation -> standardSyncOperation.getOperationId().toString(),
"operationId"),
STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class),

// worker
STANDARD_SYNC_INPUT("StandardSyncInput.yaml", StandardSyncInput.class,
standardSyncInput -> {
throw new RuntimeException("StandardSyncInput doesn't have an id");
}),
NORMALIZATION_INPUT("NormalizationInput.yaml", NormalizationInput.class,
normalizationInput -> {
throw new RuntimeException("NormalizationInput doesn't have an id");
}),
OPERATOR_DBT_INPUT("OperatorDbtInput.yaml", OperatorDbtInput.class,
operatorDbtInput -> {
throw new RuntimeException("OperatorDbtInput doesn't have an id");
}),

STANDARD_SYNC_OUTPUT("StandardSyncOutput.yaml", StandardSyncOutput.class,
standardWorkspace -> {
throw new RuntimeException("StandardSyncOutput doesn't have an id");
}),
REPLICATION_OUTPUT("ReplicationOutput.yaml", ReplicationOutput.class,
standardWorkspace -> {
throw new RuntimeException("ReplicationOutput doesn't have an id");
}),

STATE("State.yaml", State.class, standardWorkspace -> {
throw new RuntimeException("State doesn't have an id");
});
STANDARD_SYNC_INPUT("StandardSyncInput.yaml", StandardSyncInput.class),
NORMALIZATION_INPUT("NormalizationInput.yaml", NormalizationInput.class),
OPERATOR_DBT_INPUT("OperatorDbtInput.yaml", OperatorDbtInput.class),
STANDARD_SYNC_OUTPUT("StandardSyncOutput.yaml", StandardSyncOutput.class),
REPLICATION_OUTPUT("ReplicationOutput.yaml", ReplicationOutput.class),
STATE("State.yaml", State.class);

static final Path KNOWN_SCHEMAS_ROOT = JsonSchemas.prepareSchemas("types", ConfigSchema.class);

private final String schemaFilename;
private final Class<?> className;
private final Function<?, String> extractId;
private final String idFieldName;

<T> ConfigSchema(final String schemaFilename,
Class<T> className,
Function<T, String> extractId) {
Function<T, String> extractId,
String idFieldName) {
this.schemaFilename = schemaFilename;
this.className = className;
this.extractId = extractId;
this.idFieldName = idFieldName;
}

<T> ConfigSchema(final String schemaFilename,
Class<T> className) {
this.schemaFilename = schemaFilename;
this.className = className;
this.extractId = object -> {
throw new RuntimeException(className.getSimpleName() + " doesn't have an id");
};
this.idFieldName = null;
}

public File getFile() {
Expand All @@ -125,4 +118,8 @@ public <T> String getId(T object) {
throw new RuntimeException("Object: " + object + " is not instance of class " + getClassName().getName());
}

public String getIdFieldName() {
return idFieldName;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.config;

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

/**
* When migrating configs, it is possible that some of the old config types have been removed from
* the codebase. So we cannot rely on the latest {@link ConfigSchema} to migrate them. This class
* provides backward compatibility for those legacy config types during migration.
*/
public class ConfigSchemaMigrationSupport {

// a map from config schema to its id field names
public static final Map<String, String> CONFIG_SCHEMA_ID_FIELD_NAMES;

static {
Map<String, String> currentConfigSchemaIdNames = Arrays.stream(ConfigSchema.values())
.filter(configSchema -> configSchema.getIdFieldName() != null)
.collect(Collectors.toMap(Enum::name, ConfigSchema::getIdFieldName));
CONFIG_SCHEMA_ID_FIELD_NAMES = new ImmutableMap.Builder<String, String>()
.putAll(currentConfigSchemaIdNames)
// add removed config schema and its id field names below
// https://github.com/airbytehq/airbyte/pull/41
.put("SOURCE_CONNECTION_CONFIGURATION", "sourceSpecificationId")
.put("DESTINATION_CONNECTION_CONFIGURATION", "destinationSpecificationId")
// https://github.com/airbytehq/airbyte/pull/528
.put("SOURCE_CONNECTION_SPECIFICATION", "sourceSpecificationId")
.put("DESTINATION_CONNECTION_SPECIFICATION", "destinationSpecificationId")
// https://github.com/airbytehq/airbyte/pull/564
.put("STANDARD_SOURCE", "sourceId")
.put("STANDARD_DESTINATION", "destinationId")
.put("SOURCE_CONNECTION_IMPLEMENTATION", "sourceImplementationId")
.put("DESTINATION_CONNECTION_IMPLEMENTATION", "destinationImplementationId")
// https://github.com/airbytehq/airbyte/pull/3472
.put("STANDARD_SYNC_SCHEDULE", "connectionId")
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public interface Configs {

String getDatabaseUrl();

String getConfigDatabaseUser();

String getConfigDatabasePassword();

String getConfigDatabaseUrl();

String getWebappUrl();

String getWorkspaceDockerMount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class EnvConfigs implements Configs {
public static final String DATABASE_USER = "DATABASE_USER";
public static final String DATABASE_PASSWORD = "DATABASE_PASSWORD";
public static final String DATABASE_URL = "DATABASE_URL";
public static final String CONFIG_DATABASE_USER = "CONFIG_DATABASE_USER";
public static final String CONFIG_DATABASE_PASSWORD = "CONFIG_DATABASE_PASSWORD";
public static final String CONFIG_DATABASE_URL = "CONFIG_DATABASE_URL";
public static final String WEBAPP_URL = "WEBAPP_URL";
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
Expand Down Expand Up @@ -127,6 +130,24 @@ public String getDatabaseUrl() {
return getEnsureEnv(DATABASE_URL);
}

@Override
public String getConfigDatabaseUser() {
// Default to reuse the job database
return getEnvOrDefault(CONFIG_DATABASE_USER, getDatabaseUser());
}

@Override
public String getConfigDatabasePassword() {
// Default to reuse the job database
return getEnvOrDefault(CONFIG_DATABASE_PASSWORD, getDatabasePassword());
}

@Override
public String getConfigDatabaseUrl() {
// Default to reuse the job database
return getEnvOrDefault(CONFIG_DATABASE_URL, getDatabaseUrl());
}

@Override
public String getWebappUrl() {
return getEnsureEnv(WEBAPP_URL);
Expand Down Expand Up @@ -255,10 +276,10 @@ private long getEnvOrDefault(String key, long defaultValue) {

private <T> T getEnvOrDefault(String key, T defaultValue, Function<String, T> parser) {
final String value = getEnv.apply(key);
if (value != null) {
if (value != null && !value.isEmpty()) {
return parser.apply(value);
} else {
LOGGER.info(key + " not found, defaulting to " + defaultValue);
LOGGER.info(key + " not found or empty, defaulting to " + defaultValue);
return defaultValue;
}
}
Expand Down
6 changes: 5 additions & 1 deletion airbyte-config/persistence/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
dependencies {
implementation group: 'commons-io', name: 'commons-io', version: '2.7'

implementation project(':airbyte-db')
implementation project(':airbyte-config:models')
implementation project(":airbyte-json-validation")
implementation project(':airbyte-config:init')
implementation project(':airbyte-json-validation')

testImplementation "org.testcontainers:postgresql:1.15.1"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.config.persistence;

import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.table;

import java.sql.Timestamp;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.Table;

public class AirbyteConfigsTable {

public static final String AIRBYTE_CONFIGS_TABLE_SCHEMA = "airbyte_configs_table.sql";

public static final Table<Record> AIRBYTE_CONFIGS = table("airbyte_configs");
public static final Field<String> CONFIG_ID = field("config_id", String.class);
public static final Field<String> CONFIG_TYPE = field("config_type", String.class);
public static final Field<JSONB> CONFIG_BLOB = field("config_blob", JSONB.class);
public static final Field<Timestamp> CREATED_AT = field("created_at", Timestamp.class);
public static final Field<Timestamp> UPDATED_AT = field("updated_at", Timestamp.class);

}
Loading

0 comments on commit e577b49

Please sign in to comment.