-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Copy job attempt state to configs database (#7219)
* Add migration to create latest state table * Log migration name * Expose db variables to airbyte-db * Implement migration * Fix migration test * temp * Rebase on master * Save state in temporal (#7253) * Copy state to airbyte_configs table * Add standard sync state * Move state methods to config repository * Add unit tests * Fix unit tests * Register standard sync state in migration * Add comment * Use config model instead of json node * Add comments * Remove unnecessary method * Fix migration query * Remove unused config database * Move persist statement and log the call * Update dev doc * Add unit tests for sync workflow Co-authored-by: Charles <[email protected]>
- Loading branch information
Showing
51 changed files
with
1,278 additions
and
440 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
17 changes: 17 additions & 0 deletions
17
airbyte-config/models/src/main/resources/types/StandardSyncState.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
--- | ||
"$schema": http://json-schema.org/draft-07/schema# | ||
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StandardSyncState.yaml | ||
title: StandardSyncState | ||
description: The current state of a connection (i.e. StandardSync). | ||
type: object | ||
additionalProperties: false | ||
required: | ||
- connectionId | ||
properties: | ||
connectionId: | ||
type: string | ||
format: uuid | ||
description: This is a foreign key that references a connection (i.e. StandardSync). | ||
state: | ||
"$ref": State.yaml | ||
description: The current (latest) connection state. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
59 changes: 59 additions & 0 deletions
59
airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db.instance.configs; | ||
|
||
import static org.jooq.impl.DSL.field; | ||
import static org.jooq.impl.DSL.table; | ||
|
||
import io.airbyte.config.ConfigSchema; | ||
import io.airbyte.db.Database; | ||
import io.airbyte.db.ExceptionWrappingDatabase; | ||
import io.airbyte.db.instance.DatabaseMigrator; | ||
import io.airbyte.db.instance.test.TestDatabaseProvider; | ||
import java.io.IOException; | ||
import java.time.OffsetDateTime; | ||
import java.util.UUID; | ||
import org.jooq.JSONB; | ||
|
||
public class ConfigsDatabaseTestProvider implements TestDatabaseProvider { | ||
|
||
private final String user; | ||
private final String password; | ||
private final String jdbcUrl; | ||
|
||
public ConfigsDatabaseTestProvider(final String user, final String password, final String jdbcUrl) { | ||
this.user = user; | ||
this.password = password; | ||
this.jdbcUrl = jdbcUrl; | ||
} | ||
|
||
@Override | ||
public Database create(final boolean runMigration) throws IOException { | ||
final Database database = new ConfigsDatabaseInstance(user, password, jdbcUrl) | ||
.getAndInitialize(); | ||
|
||
if (runMigration) { | ||
final DatabaseMigrator migrator = new ConfigsDatabaseMigrator( | ||
database, | ||
ConfigsDatabaseTestProvider.class.getSimpleName()); | ||
migrator.createBaseline(); | ||
migrator.migrate(); | ||
} | ||
|
||
// The configs database is considered ready only if there are some seed records. | ||
// So we need to create at least one record here. | ||
final OffsetDateTime timestamp = OffsetDateTime.now(); | ||
new ExceptionWrappingDatabase(database).transaction(ctx -> ctx.insertInto(table("airbyte_configs")) | ||
.set(field("config_id"), UUID.randomUUID().toString()) | ||
.set(field("config_type"), ConfigSchema.STATE.name()) | ||
.set(field("config_blob"), JSONB.valueOf("{}")) | ||
.set(field("created_at"), timestamp) | ||
.set(field("updated_at"), timestamp) | ||
.execute()); | ||
|
||
return database; | ||
} | ||
|
||
} |
Oops, something went wrong.