-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(platform): use updated configuration after check during syncs #21899
Conversation
final SyncJobCheckConnectionInputs checkInputs; | ||
if (generateCheckInputVersion < GENERATE_CHECK_INPUT_CURRENT_VERSION && jobInputs != null) { | ||
checkInputs = getCheckConnectionInputFromSync(jobInputs); | ||
} else { | ||
checkInputs = getCheckConnectionInput(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previous versions of the workflow will have retrieved the sync input, which we can use to build the check inputs like was being done before.
For new workflow executions, we will instead run an activity to generate the Check inputs specifically.
final int generateCheckInputVersion = | ||
Workflow.getVersion(GENERATE_CHECK_INPUT_TAG, Workflow.DEFAULT_VERSION, GENERATE_CHECK_INPUT_CURRENT_VERSION); | ||
|
||
GeneratedJobInput jobInputs = null; | ||
if (generateCheckInputVersion < GENERATE_CHECK_INPUT_CURRENT_VERSION) { | ||
jobInputs = getJobInput(); | ||
} | ||
|
||
reportJobStarting(connectionUpdaterInput.getConnectionId()); | ||
StandardSyncOutput standardSyncOutput = null; | ||
|
||
try { | ||
final SyncCheckConnectionFailure syncCheckConnectionFailure = checkConnections(jobInputs); | ||
final SyncCheckConnectionFailure syncCheckConnectionFailure = checkConnections(getJobRunConfig(), jobInputs); | ||
if (syncCheckConnectionFailure.isFailed()) { | ||
final StandardSyncOutput checkFailureOutput = syncCheckConnectionFailure.buildFailureOutput(); | ||
workflowState.setFailed(getFailStatus(checkFailureOutput)); | ||
reportFailure(connectionUpdaterInput, checkFailureOutput, FailureCause.CONNECTION); | ||
} else { | ||
if (generateCheckInputVersion >= GENERATE_CHECK_INPUT_CURRENT_VERSION) { | ||
jobInputs = getJobInput(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where we are moving the activity that generates the SyncWorkflow inputs to after the potential checks. Based on the workflow version, we keep the old position of the activity.
We do this so that any config updates that happen during check are freshly retrieved before running the sync.
return sourceLauncherConfig; | ||
} | ||
|
||
private IntegrationLauncherConfig getDestinationIntegrationLauncherConfig(final long jobId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most of the changes in this file are pulling out some common logic used for both SyncWorkflow and Check input generation.
private static final String GENERATE_CHECK_INPUT_TAG = "generate_check_input"; | ||
private static final int GENERATE_CHECK_INPUT_CURRENT_VERSION = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... time to version I guess!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you didn't version, I think that running syncs would fail, but perhaps only before the sync stage, not during or afterwords? If that's the case, and they retry, that's not so bad?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that sounds right. Avoiding the failure wasn't too bad here with the version though so seemed worth it to keep history compatibility. FWIW there's also a test that verifies history can be replayed that not versioning would break 😄
@@ -36,6 +37,18 @@ class SyncInputWithAttemptNumber { | |||
|
|||
} | |||
|
|||
@Data | |||
@NoArgsConstructor | |||
@AllArgsConstructor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AllArgsConstructor
- today I learned
if (!isLastJobOrAttemptFailure) { | ||
log.info("SOURCE CHECK: Skipped, last attempt was not a failure"); | ||
log.info("DESTINATION CHECK: Skipped, last attempt was not a failure"); | ||
return checkFailure; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm confused by this conditional - if we are skipping CHECKS because the previous job/attempt didn't fail, why would we return a failure? Or is checkFailure initialized with isFailed(false)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was already the case before, I just moved this higher up to be clearer / more intentional about it.
Yes, a SyncCheckConnectionFailure
is always returned by this method. When an actual failure happens, checkFailure.setFailureOrigin
and checkFailure.setFailureOutput
are called. The SyncCheckConnectionFailure
class has a method .isFailed()
that actually returns whether it failed or not if these are set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment on the method? This is confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this was definitely confusing :D
Renamed SyncCheckConnectionFailure to SyncCheckConnectionResult to be more clear that it doesn't always represent a failure.
High-level comment - I think it would be worthwhile to break this down into smaller PRs, to (1) reduce the likelihood that something goes wrong; (2) minimise the impact if something does go wrong.
|
@mfsiega-airbyte whoops, 100% - I did separate this into a smaller set of changes but when I merged the other piece this one now includes everything. Sorting that out now. |
@mfsiega-airbyte OK - this should be more manageable now |
superseded by airbytehq/airbyte-platform#91 |
What
fixes #20912
This PR builds on the changes introduced in #21629. While the aforementioned PR addressed using updated state/config values between attempts, this PR solves the issue of updating configuration within an attempt. This can happen when, for example, an oauth token is refreshed during the
check
operation that can run prior to the SyncWorkflow.How
This implements the second part of the solution as laid out in the tech spec, changing when we're generating the sync input and how we are generating the check inputs.
Before this change, the SyncWorkflow input was being generated at the start of the ConnectionManagerWorkflow, and was then converted into Check inputs if needed. The activities look something like this:
(these are snippets of diagrams, the full views can be found in the tech spec)
After this change, we separate the check input generation into its own activity method. Then, we can move the SyncWorkflow input generation to after the check activity runs. Because the previous PR updated the SyncWorkflow input generation activity to fetch latest configs, this will take into consideration any changes that happened during check.
This ends up looking something like this:
The change is versioned, so ConnectionManagerWorkflows that executed before this change will still follow the old path/behavior.
🚨 User Impact 🚨
No breaking change. Configs updated during check will now correctly be retrieved for the sync.