-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backfill specs into definitions #7616
Conversation
LOGGER.info( | ||
"migrateAllDefinitionsToContainSpec - Failed to retrieve spec for Source Definition {}.", | ||
sourceDef.getName()); | ||
throw new RuntimeException(String.format("Failed to retrieve spec for Source Definition %s", sourceDef.getName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately if we hit this case where the getSpecJob is unsuccessful, we won't have much useful information to report to Segment besides the fact that it failed for that docker image. Is there any way to glean more info about how the temporal job failed in this case? The only thing I could find was getting the log path which doesn't sound very useful to have in segment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the Synchronous
response object that is getSpecJob
can help. JobConverters.getLogRead(getSpecJob.getMetadata().getLogPath(getLogPath));
. essentially that object has a log path. and then the converters class has code to read from that log path. it might be too much info to send though... you'd have to play with it a bit to see if it is useful.
"docker_image_name", dockerRepo, | ||
"docker_image_tag", dockerImageTag, | ||
"exception", exception); | ||
trackingClient.track(UUID.fromString("000000"), "failed_spec_backfill", metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a comment explaining why we are going with this uuid. otherwise the next person who reads it will come ask you 😛
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually had to get rid of this and use a real workspace ID, because the tracking client yelled at me if I tried to pass in 0s.
LOGGER.info( | ||
"migrateAllDefinitionsToContainSpec - Failed to retrieve spec for Source Definition {}.", | ||
sourceDef.getName()); | ||
throw new RuntimeException(String.format("Failed to retrieve spec for Source Definition %s", sourceDef.getName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the Synchronous
response object that is getSpecJob
can help. JobConverters.getLogRead(getSpecJob.getMetadata().getLogPath(getLogPath));
. essentially that object has a log path. and then the converters class has code to read from that log path. it might be too much info to send though... you'd have to play with it a bit to see if it is useful.
* @param configRepository - access to the db | ||
* @param schedulerClient - scheduler client so that specs can be fetched as needed | ||
*/ | ||
private static void migrateAllDefinitionsToContainSpec(final ConfigRepository configRepository, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a test we can write for this? this is one of those things that if there are edge cases that we hit, it will be really hard to fix (especially after the major version bump).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added a test suite for this, and I also tested the success and failure flows locally and everything seems to be working as expected. Lmk if you think that is enough or if we need to test more
"migrateAllDefinitionsToContainSpec - Spec for Destination Definition {} was successfully written to the db record.", | ||
destDef.getName()); | ||
} else { | ||
final LogRead logRead = jobConverter.getLogRead(getSpecJob.getMetadata().getLogPath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ended up working great, I tested it out and it doesn't seem to be too much info to me, just a stacktrace of what went wrong in the get spec job. Here is an example:
airbyte-server | 2021-11-04 19:29:34 INFO i.a.s.ServerApp(migrateAllDefinitionsToContainSpec):316 - {workspace_app_root=/tmp/workspace/server/logs} - migrateAllDefinitionsToContainSpec - Failed to retrieve spec for Source Definition mk556 test 2. Logs: class LogRead { airbyte-server | logLines: [2021-11-04 19:29:31 INFO () TemporalAttemptExecution(get):116 - Executing worker wrapper. Airbyte version: dev, 2021-11-04 19:29:31 INFO () LineGobbler(voidCall):82 - Checking if mk556/airbyte-custom-connectors:0.1.2 exists..., 2021-11-04 19:29:31 INFO () LineGobbler(voidCall):82 - mk556/airbyte-custom-connectors:0.1.2 not found locally. Attempting to pull the image..., 2021-11-04 19:29:34 INFO () LineGobbler(voidCall):82 - Image does not exist., 2021-11-04 19:29:34 INFO () TemporalAttemptExecution(lambda$getWorkerThread$2):170 - Completing future exceptionally..., io.airbyte.workers.WorkerException: Error while getting spec from image mk556/airbyte-custom-connectors:0.1.2, at io.airbyte.workers.DefaultGetSpecWorker.run(DefaultGetSpecWorker.java:74) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at io.airbyte.workers.DefaultGetSpecWorker.run(DefaultGetSpecWorker.java:23) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at java.lang.Thread.run(Thread.java:832) [?:?], Caused by: io.airbyte.workers.WorkerException: Could not find image: mk556/airbyte-custom-connectors:0.1.2, at io.airbyte.workers.process.DockerProcessFactory.create(DockerProcessFactory.java:77) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at io.airbyte.workers.process.AirbyteIntegrationLauncher.spec(AirbyteIntegrationLauncher.java:52) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at io.airbyte.workers.DefaultGetSpecWorker.run(DefaultGetSpecWorker.java:44) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], ... 3 more, 2021-11-04 19:29:34 INFO () TemporalAttemptExecution(get):137 - Stopping cancellation check scheduling..., 2021-11-04 19:29:34 WARN () POJOActivityTaskHandler$POJOActivityImplementation(execute):243 - Activity failure. ActivityId=8f0a08f5-04c1-3112-8854-658ba254ff71, activityType=Run, attempt=1, java.util.concurrent.ExecutionException: io.airbyte.workers.WorkerException: Error while getting spec from image mk556/airbyte-custom-connectors:0.1.2, at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?], at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2063) ~[?:?], at io.airbyte.workers.temporal.TemporalAttemptExecution.get(TemporalAttemptExecution.java:135) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at io.airbyte.workers.temporal.spec.SpecActivityImpl.run(SpecActivityImpl.java:66) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?], at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?], at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?], at java.lang.reflect.Method.invoke(Method.java:564) ~[?:?], at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityInboundCallsInterceptor.execute(POJOActivityTaskHandler.java:277) ~[temporal-sdk-1.0.4.jar:?], at io.temporal.internal.sync.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:216) ~[temporal-sdk-1.0.4.jar:?], at io.temporal.internal.sync.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:181) ~[temporal-sdk-1.0.4.jar:?], at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:192) ~[temporal-sdk-1.0.4.jar:?], at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:154) ~[temporal-sdk-1.0.4.jar:?], at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73) ~[temporal-sdk-1.0.4.jar:?], at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?], at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?], at java.lang.Thread.run(Thread.java:832) [?:?], Caused by: io.airbyte.workers.WorkerException: Error while getting spec from image mk556/airbyte-custom-connectors:0.1.2, at io.airbyte.workers.DefaultGetSpecWorker.run(DefaultGetSpecWorker.java:74) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at io.airbyte.workers.DefaultGetSpecWorker.run(DefaultGetSpecWorker.java:23) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], ... 1 more, Caused by: io.airbyte.workers.WorkerException: Could not find image: mk556/airbyte-custom-connectors:0.1.2, at io.airbyte.workers.process.DockerProcessFactory.create(DockerProcessFactory.java:77) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at io.airbyte.workers.process.AirbyteIntegrationLauncher.spec(AirbyteIntegrationLauncher.java:52) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at io.airbyte.workers.DefaultGetSpecWorker.run(DefaultGetSpecWorker.java:44) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at io.airbyte.workers.DefaultGetSpecWorker.run(DefaultGetSpecWorker.java:23) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.30.28-alpha.jar:?], ... 1 more]
I had a question about how to view the actual event metadata that I push to segment, see my slack post here: https://airbytehq.slack.com/archives/C0229LM09T4/p1636049986231600 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great!
What
Backfills specs into the connector definitions in the db.
How
On server startup, attempts to retrieve specs for connector definitions and write them to the db records. If it fails, it just tracks the failure in Segment and continues with server startup normally.
We plan to introduce a later PR to block on this backfilling logic, and require users to fix their containers or force upgrade by deleting connections for which spec fetching fails.
Recommended reading order
x.java
y.python
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changes