-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backfill specs into definitions #7616
Changes from 5 commits
f74b6e0
ba9b909
ab47c44
755747a
121fe31
2bb7959
e4b623c
308da78
8b2c29f
d26bb3f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,15 +4,19 @@ | |
|
||
package io.airbyte.server; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import io.airbyte.analytics.Deployment; | ||
import io.airbyte.analytics.TrackingClient; | ||
import io.airbyte.analytics.TrackingClientSingleton; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.commons.lang.Exceptions; | ||
import io.airbyte.commons.resources.MoreResources; | ||
import io.airbyte.commons.version.AirbyteVersion; | ||
import io.airbyte.config.Configs; | ||
import io.airbyte.config.Configs.WorkerEnvironment; | ||
import io.airbyte.config.EnvConfigs; | ||
import io.airbyte.config.StandardDestinationDefinition; | ||
import io.airbyte.config.StandardSourceDefinition; | ||
import io.airbyte.config.StandardWorkspace; | ||
import io.airbyte.config.helpers.LogClientSingleton; | ||
import io.airbyte.config.persistence.ConfigPersistence; | ||
|
@@ -27,11 +31,13 @@ | |
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; | ||
import io.airbyte.db.instance.jobs.JobsDatabaseInstance; | ||
import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; | ||
import io.airbyte.protocol.models.ConnectorSpecification; | ||
import io.airbyte.scheduler.client.BucketSpecCacheSchedulerClient; | ||
import io.airbyte.scheduler.client.DefaultSchedulerJobClient; | ||
import io.airbyte.scheduler.client.DefaultSynchronousSchedulerClient; | ||
import io.airbyte.scheduler.client.SchedulerJobClient; | ||
import io.airbyte.scheduler.client.SpecCachingSynchronousSchedulerClient; | ||
import io.airbyte.scheduler.client.SynchronousResponse; | ||
import io.airbyte.scheduler.client.SynchronousSchedulerClient; | ||
import io.airbyte.scheduler.persistence.DefaultJobCreator; | ||
import io.airbyte.scheduler.persistence.DefaultJobPersistence; | ||
|
@@ -235,6 +241,10 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con | |
runFlywayMigration(configs, configDatabase, jobDatabase); | ||
configPersistence.loadData(seed); | ||
|
||
// todo (lmossman) - this will only exist temporarily to ensure all definitions contain specs. It | ||
// will be removed after the faux major version bump | ||
migrateAllDefinitionsToContainSpec(configRepository, cachingSchedulerClient); | ||
|
||
return apiFactory.create( | ||
schedulerJobClient, | ||
cachingSchedulerClient, | ||
|
@@ -256,6 +266,85 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con | |
} | ||
} | ||
|
||
/** | ||
* Check that each spec in the database has a spec. If it doesn't, add it. If it can't be added, | ||
* track the failure in Segment. The goal is to try to end up in a state where all definitions in | ||
* the db contain specs, and to understand what is stopping us from getting there. | ||
* | ||
* @param configRepository - access to the db | ||
* @param schedulerClient - scheduler client so that specs can be fetched as needed | ||
*/ | ||
private static void migrateAllDefinitionsToContainSpec(final ConfigRepository configRepository, | ||
final SynchronousSchedulerClient schedulerClient) | ||
throws Exception { | ||
for (final StandardSourceDefinition sourceDef : configRepository.listStandardSourceDefinitions()) { | ||
try { | ||
if (sourceDef.getSpec() == null) { | ||
LOGGER.info( | ||
"migrateAllDefinitionsToContainSpec - Source Definition {} does not have a spec. Attempting to retrieve spec...", | ||
sourceDef.getName()); | ||
final SynchronousResponse<ConnectorSpecification> getSpecJob = schedulerClient | ||
.createGetSpecJob(sourceDef.getDockerRepository() + ":" + sourceDef.getDockerImageTag()); | ||
if (getSpecJob.isSuccess()) { | ||
LOGGER.info( | ||
"migrateAllDefinitionsToContainSpec - Spec for Source Definition {} was successfully retrieved. Writing to the db...", | ||
sourceDef.getName()); | ||
final StandardSourceDefinition updatedDef = Jsons.clone(sourceDef).withSpec(getSpecJob.getOutput()); | ||
configRepository.writeStandardSourceDefinition(updatedDef); | ||
LOGGER.info( | ||
"migrateAllDefinitionsToContainSpec - Spec for Source Definition {} was successfully written to the db record.", | ||
sourceDef.getName()); | ||
} else { | ||
LOGGER.info( | ||
"migrateAllDefinitionsToContainSpec - Failed to retrieve spec for Source Definition {}.", | ||
sourceDef.getName()); | ||
throw new RuntimeException(String.format("Failed to retrieve spec for Source Definition %s", sourceDef.getName())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately if we hit this case where the getSpecJob is unsuccessful, we won't have much useful information to report to Segment besides the fact that it failed for that docker image. Is there any way to glean more info about how the temporal job failed in this case? The only thing I could find was getting the log path which doesn't sound very useful to have in segment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think the |
||
} | ||
} | ||
} catch (final Exception e) { | ||
trackSpecBackfillFailure(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag(), e); | ||
} | ||
} | ||
|
||
for (final StandardDestinationDefinition destDef : configRepository.listStandardDestinationDefinitions()) { | ||
try { | ||
if (destDef.getSpec() == null) { | ||
LOGGER.info( | ||
"migrateAllDefinitionsToContainSpec - Destination Definition {} does not have a spec. Attempting to retrieve spec...", | ||
destDef.getName()); | ||
final SynchronousResponse<ConnectorSpecification> getSpecJob = schedulerClient | ||
.createGetSpecJob(destDef.getDockerRepository() + ":" + destDef.getDockerImageTag()); | ||
if (getSpecJob.isSuccess()) { | ||
LOGGER.info( | ||
"migrateAllDefinitionsToContainSpec - Spec for Destination Definition {} was successfully retrieved. Writing to the db...", | ||
destDef.getName()); | ||
final StandardDestinationDefinition updatedDef = Jsons.clone(destDef).withSpec(getSpecJob.getOutput()); | ||
configRepository.writeStandardDestinationDefinition(updatedDef); | ||
LOGGER.info( | ||
"migrateAllDefinitionsToContainSpec - Spec for Destination Definition {} was successfully written to the db record.", | ||
destDef.getName()); | ||
} else { | ||
LOGGER.info( | ||
"migrateAllDefinitionsToContainSpec - Failed to retrieve spec for Destination Definition {}.", | ||
destDef.getName()); | ||
throw new RuntimeException(String.format("Failed to retrieve spec for Destination Definition %s", destDef.getName())); | ||
} | ||
} | ||
} catch (final Exception e) { | ||
trackSpecBackfillFailure(destDef.getDockerRepository(), destDef.getDockerImageTag(), e); | ||
} | ||
} | ||
} | ||
|
||
private static void trackSpecBackfillFailure(final String dockerRepo, final String dockerImageTag, final Exception exception) { | ||
final TrackingClient trackingClient = TrackingClientSingleton.get(); | ||
final ImmutableMap<String, Object> metadata = ImmutableMap.of( | ||
"docker_image_name", dockerRepo, | ||
"docker_image_tag", dockerImageTag, | ||
"exception", exception); | ||
trackingClient.track(UUID.fromString("000000"), "failed_spec_backfill", metadata); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add a comment explaining why we are going with this uuid. otherwise the next person who reads it will come ask you 😛 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually had to get rid of this and use a real workspace ID, because the tracking client yelled at me if I tried to pass in 0s. |
||
} | ||
|
||
@Deprecated | ||
@SuppressWarnings({"DeprecatedIsStillUsed"}) | ||
private static Optional<AirbyteVersion> runFileMigration(final AirbyteVersion airbyteVersion, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a test we can write for this? this is one of those things that if there are edge cases that we hit, it will be really hard to fix (especially after the major version bump).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added a test suite for this, and I also tested the success and failure flows locally and everything seems to be working as expected. Lmk if you think that is enough or if we need to test more