Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refresh actor configuration and state between sync attempts #21629

Merged
merged 30 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c65c003
add AttemptSyncConfig, move info out of JobSyncConfig
pedroslopez Jan 19, 2023
5dbea60
get build working
pedroslopez Jan 19, 2023
7c82560
Merge branch 'master' into pedroslopez/attempt-sync-config
pedroslopez Jan 20, 2023
5aa96b9
add db migration
pedroslopez Jan 20, 2023
2bfe226
load config when building attempts
pedroslopez Jan 20, 2023
6c4e0cd
persist AttemptSyncConfig
pedroslopez Jan 20, 2023
0ea4e4b
it compiles
pedroslopez Jan 20, 2023
8d97db0
fix job persistence test
pedroslopez Jan 20, 2023
2349a44
implement submitSync with attempt config
pedroslopez Jan 20, 2023
de41dc6
fix TemporalClientTest
pedroslopez Jan 20, 2023
74578dc
reorganizing some code
pedroslopez Jan 20, 2023
fbaf075
add GenerateInputActivity test
pedroslopez Jan 20, 2023
fef3e5d
verify AttemptSyncConfig is persisted
pedroslopez Jan 20, 2023
532e8fd
add test for persistence changes
pedroslopez Jan 20, 2023
f31e201
add test for getAttemptByNumber
pedroslopez Jan 20, 2023
6431d5c
Merge branch 'master' into pedroslopez/attempt-sync-config
pedroslopez Jan 24, 2023
1411e04
use apis rather than direct db access
pedroslopez Jan 25, 2023
2f2eaa3
Merge branch 'master' into pedroslopez/attempt-sync-config
pedroslopez Jan 25, 2023
9ecd1b2
fix compatibility with master
pedroslopez Jan 25, 2023
773d4b6
copy update
pedroslopez Jan 25, 2023
f5ed8de
fix tests for allowed hosts addition
pedroslopez Jan 25, 2023
9dbf869
remove debug logging
pedroslopez Jan 25, 2023
1541875
Merge branch 'master' into pedroslopez/attempt-sync-config
pedroslopez Jan 25, 2023
d339247
fix: handle when state is not set on the connection
pedroslopez Jan 28, 2023
b393663
Merge branch 'master' into pedroslopez/attempt-sync-config
pedroslopez Jan 28, 2023
90a1d95
fix: handle unset state (on the server this time)
pedroslopez Jan 28, 2023
4f4f242
Merge branch 'master' into pedroslopez/attempt-sync-config
pedroslopez Jan 28, 2023
3a4034d
set state type when converting to internal representation
pedroslopez Feb 1, 2023
20af23d
Merge branch 'master' into pedroslopez/attempt-sync-config
pedroslopez Feb 1, 2023
8c7f55b
Merge branch 'master' into pedroslopez/attempt-sync-config
pedroslopez Feb 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2214,6 +2214,26 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"
/v1/attempt/save_sync_config:
post:
tags:
- attempt
- internal
summary: For worker to save the AttemptSyncConfig for an attempt.
operationId: saveSyncConfig
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SaveAttemptSyncConfigRequestBody"
required: true
responses:
"200":
description: Successful Operation
content:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"

components:
securitySchemes:
Expand Down Expand Up @@ -5022,6 +5042,31 @@ components:
type: array
items:
$ref: "#/components/schemas/AttemptStreamStats"
AttemptSyncConfig:
type: object
required:
- sourceConfiguration
- destinationConfiguration
properties:
sourceConfiguration:
$ref: "#/components/schemas/SourceConfiguration"
destinationConfiguration:
$ref: "#/components/schemas/DestinationConfiguration"
state:
$ref: "#/components/schemas/ConnectionState"
SaveAttemptSyncConfigRequestBody:
type: object
required:
- jobId
- attemptNumber
- syncConfig
properties:
jobId:
$ref: "#/components/schemas/JobId"
attemptNumber:
$ref: "#/components/schemas/AttemptNumber"
syncConfig:
$ref: "#/components/schemas/AttemptSyncConfig"
InternalOperationResult:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class BootloaderTest {

// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_MIGRATION_VERSION = "0.40.27.001";
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.40.27.001";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.40.28.001";

@BeforeEach
void setup() {
Expand Down Expand Up @@ -147,10 +148,10 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.40.26.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals(CURRENT_JOBS_MIGRATION_VERSION, jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
assertEquals(CURRENT_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals(CURRENT_CONFIGS_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion());

assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
assertEquals(new Version(PROTOCOL_VERSION_123), jobsPersistence.getAirbyteProtocolVersionMin().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.commons.server.converters;

import io.airbyte.api.model.generated.ActorDefinitionResourceRequirements;
import io.airbyte.api.model.generated.AttemptSyncConfig;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionSchedule;
import io.airbyte.api.model.generated.ConnectionScheduleData;
Expand All @@ -22,6 +23,12 @@
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSync;
import io.airbyte.config.State;
import io.airbyte.config.StateWrapper;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.workers.helper.StateConverter;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

public class ApiPojoConverters {
Expand All @@ -42,6 +49,36 @@ public static io.airbyte.config.ActorDefinitionResourceRequirements actorDefReso
.collect(Collectors.toList()));
}

public static io.airbyte.config.AttemptSyncConfig attemptSyncConfigToInternal(final AttemptSyncConfig attemptSyncConfig) {
if (attemptSyncConfig == null) {
return null;
}
final StateWrapper stateWrapper = StateConverter.toInternal(attemptSyncConfig.getState());
final io.airbyte.config.State state = StateMessageHelper.getState(stateWrapper);

return new io.airbyte.config.AttemptSyncConfig()
.withSourceConfiguration(attemptSyncConfig.getSourceConfiguration())
.withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration())
.withState(state);
}

public static io.airbyte.api.client.model.generated.AttemptSyncConfig attemptSyncConfigToClient(final io.airbyte.config.AttemptSyncConfig attemptSyncConfig,
final UUID connectionId,
final boolean useStreamCapableState) {
if (attemptSyncConfig == null) {
return null;
}

final State state = attemptSyncConfig.getState();
final Optional<StateWrapper> optStateWrapper = state != null ? StateMessageHelper.getTypedState(
state.getState(), useStreamCapableState) : Optional.empty();

return new io.airbyte.api.client.model.generated.AttemptSyncConfig()
.sourceConfiguration(attemptSyncConfig.getSourceConfiguration())
.destinationConfiguration(attemptSyncConfig.getDestinationConfiguration())
.state(StateConverter.toClient(connectionId, optStateWrapper.orElse(null)));
}

public static ActorDefinitionResourceRequirements actorDefResourceReqsToApi(final io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqs) {
if (actorDefResourceReqs == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package io.airbyte.commons.server.handlers;

import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody;
import io.airbyte.api.model.generated.SaveStatsRequestBody;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.commons.server.converters.ApiPojoConverters;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.persistence.job.JobPersistence;
Expand Down Expand Up @@ -63,4 +65,17 @@ public InternalOperationResult saveStats(final SaveStatsRequestBody requestBody)
return new InternalOperationResult().succeeded(true);
}

public InternalOperationResult saveSyncConfig(final SaveAttemptSyncConfigRequestBody requestBody) {
try {
jobPersistence.writeAttemptSyncConfig(
requestBody.getJobId(),
requestBody.getAttemptNumber(),
ApiPojoConverters.attemptSyncConfigToInternal(requestBody.getSyncConfig()));
} catch (final IOException ioe) {
LOGGER.error("IOException when saving AttemptSyncConfig for attempt;", ioe);
return new InternalOperationResult().succeeded(false);
}
return new InternalOperationResult().succeeded(true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.api.model.generated.AttemptSyncConfig;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.GlobalState;
import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.server.converters.ApiPojoConverters;
import io.airbyte.persistence.job.JobPersistence;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -26,6 +35,7 @@ class AttemptHandlerTest {
JobPersistence jobPersistence;
AttemptHandler handler;

private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final long JOB_ID = 10002L;
private static final int ATTEMPT_NUMBER = 1;

Expand All @@ -39,14 +49,14 @@ public void init() {

@Test
void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception {
String workflowId = UUID.randomUUID().toString();
final String workflowId = UUID.randomUUID().toString();

final ArgumentCaptor<Integer> attemptNumberCapture = ArgumentCaptor.forClass(Integer.class);
final ArgumentCaptor<Long> jobIdCapture = ArgumentCaptor.forClass(Long.class);
final ArgumentCaptor<String> workflowIdCapture = ArgumentCaptor.forClass(String.class);
final ArgumentCaptor<String> queueCapture = ArgumentCaptor.forClass(String.class);

SetWorkflowInAttemptRequestBody requestBody =
final SetWorkflowInAttemptRequestBody requestBody =
new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId)
.processingTaskQueue(PROCESSING_TASK_QUEUE);

Expand All @@ -63,7 +73,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception {

@Test
void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
String workflowId = UUID.randomUUID().toString();
final String workflowId = UUID.randomUUID().toString();

doThrow(IOException.class).when(jobPersistence).setAttemptTemporalWorkflowInfo(anyLong(), anyInt(),
any(), any());
Expand All @@ -73,7 +83,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
final ArgumentCaptor<String> workflowIdCapture = ArgumentCaptor.forClass(String.class);
final ArgumentCaptor<String> queueCapture = ArgumentCaptor.forClass(String.class);

SetWorkflowInAttemptRequestBody requestBody =
final SetWorkflowInAttemptRequestBody requestBody =
new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId)
.processingTaskQueue(PROCESSING_TASK_QUEUE);

Expand All @@ -88,4 +98,38 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
assertEquals(PROCESSING_TASK_QUEUE, queueCapture.getValue());
}

@Test
void testInternalHandlerSetsAttemptSyncConfig() throws Exception {
final ArgumentCaptor<Integer> attemptNumberCapture = ArgumentCaptor.forClass(Integer.class);
final ArgumentCaptor<Long> jobIdCapture = ArgumentCaptor.forClass(Long.class);
final ArgumentCaptor<io.airbyte.config.AttemptSyncConfig> attemptSyncConfigCapture =
ArgumentCaptor.forClass(io.airbyte.config.AttemptSyncConfig.class);

final JsonNode sourceConfig = Jsons.jsonNode(Map.of("source_key", "source_val"));
final JsonNode destinationConfig = Jsons.jsonNode(Map.of("destination_key", "destination_val"));
final ConnectionState state = new ConnectionState()
.connectionId(CONNECTION_ID)
.stateType(ConnectionStateType.GLOBAL)
.streamState(null)
.globalState(new GlobalState().sharedState(Jsons.jsonNode(Map.of("state_key", "state_val"))));

final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
.destinationConfiguration(destinationConfig)
.sourceConfiguration(sourceConfig)
.state(state);

final SaveAttemptSyncConfigRequestBody requestBody =
new SaveAttemptSyncConfigRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).syncConfig(attemptSyncConfig);

assertTrue(handler.saveSyncConfig(requestBody).getSucceeded());

Mockito.verify(jobPersistence).writeAttemptSyncConfig(jobIdCapture.capture(), attemptNumberCapture.capture(), attemptSyncConfigCapture.capture());

final io.airbyte.config.AttemptSyncConfig expectedAttemptSyncConfig = ApiPojoConverters.attemptSyncConfigToInternal(attemptSyncConfig);

assertEquals(ATTEMPT_NUMBER, attemptNumberCapture.getValue());
assertEquals(JOB_ID, jobIdCapture.getValue());
assertEquals(expectedAttemptSyncConfig, attemptSyncConfigCapture.getValue());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private static AttemptRead toAttemptRead(final Attempt a) {
}

private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) {
return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, status, null, null, timestamps, timestamps, timestamps);
return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, null, status, null, null, timestamps, timestamps, timestamps);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.commons.temporal.scheduling.SpecWorkflow;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.airbyte.config.AttemptSyncConfig;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobDiscoverCatalogConfig;
Expand Down Expand Up @@ -372,7 +373,11 @@ public TemporalResponse<ConnectorJobOutput> submitDiscoverSchema(final UUID jobI
() -> getWorkflowStub(DiscoverCatalogWorkflow.class, TemporalJobType.DISCOVER_SCHEMA).run(jobRunConfig, launcherConfig, input));
}

public TemporalResponse<StandardSyncOutput> submitSync(final long jobId, final int attempt, final JobSyncConfig config, final UUID connectionId) {
public TemporalResponse<StandardSyncOutput> submitSync(final long jobId,
final int attempt,
final JobSyncConfig config,
final AttemptSyncConfig attemptConfig,
final UUID connectionId) {
Comment on lines +376 to +380
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the necessary changes here because we're building a sync input, but I couldn't really find where the temporal client's submitSync method actually runs. To me it seems like in practice we're using the GenerateInputActivity to do this.

Maybe @benmoriceau knows more about why this is here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notably, this sync input is missing other recent additions, like source/destination actor IDs and allowed hosts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it is some legacy from before we had the ConnectionManagerWorkflow was not existing.
The WorkerRun (that call that method upon creation) seems to be only useful to get the jobRoot...

This type of starting a temporal job is still useful for check connection and discover schema.

final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt);

final IntegrationLauncherConfig sourceLauncherConfig = new IntegrationLauncherConfig()
Expand All @@ -393,11 +398,11 @@ public TemporalResponse<StandardSyncOutput> submitSync(final long jobId, final i
.withNamespaceDefinition(config.getNamespaceDefinition())
.withNamespaceFormat(config.getNamespaceFormat())
.withPrefix(config.getPrefix())
.withSourceConfiguration(config.getSourceConfiguration())
.withDestinationConfiguration(config.getDestinationConfiguration())
.withSourceConfiguration(attemptConfig.getSourceConfiguration())
.withDestinationConfiguration(attemptConfig.getDestinationConfiguration())
.withOperationSequence(config.getOperationSequence())
.withCatalog(config.getConfiguredAirbyteCatalog())
.withState(config.getState())
.withState(attemptConfig.getState())
.withResourceRequirements(config.getResourceRequirements())
.withSourceResourceRequirements(config.getSourceResourceRequirements())
.withDestinationResourceRequirements(config.getDestinationResourceRequirements());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.commons.temporal.scheduling.SpecWorkflow;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.airbyte.config.AttemptSyncConfig;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobCheckConnectionConfig;
Expand Down Expand Up @@ -271,26 +272,27 @@ void testSubmitSync() {
final JobSyncConfig syncConfig = new JobSyncConfig()
.withSourceDockerImage(IMAGE_NAME1)
.withDestinationDockerImage(IMAGE_NAME2)
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject())
.withOperationSequence(List.of())
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());
final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject());
final StandardSyncInput input = new StandardSyncInput()
.withNamespaceDefinition(syncConfig.getNamespaceDefinition())
.withNamespaceFormat(syncConfig.getNamespaceFormat())
.withPrefix(syncConfig.getPrefix())
.withSourceConfiguration(syncConfig.getSourceConfiguration())
.withDestinationConfiguration(syncConfig.getDestinationConfiguration())
.withSourceConfiguration(attemptSyncConfig.getSourceConfiguration())
.withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration())
.withOperationSequence(syncConfig.getOperationSequence())
.withCatalog(syncConfig.getConfiguredAirbyteCatalog())
.withState(syncConfig.getState());
.withState(attemptSyncConfig.getState());

final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig()
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_ID)
.withDockerImage(IMAGE_NAME2);

temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID);
discoverCatalogWorkflow.run(JOB_RUN_CONFIG, LAUNCHER_CONFIG, destinationLauncherConfig, input, CONNECTION_ID);
verify(workflowClient).newWorkflowStub(SyncWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.SYNC));
}
Expand Down Expand Up @@ -340,15 +342,17 @@ void testforceCancelConnection() {
doReturn(true).when(temporalClient).isWorkflowReachable(any(UUID.class));
when(workflowClient.newWorkflowStub(any(Class.class), anyString())).thenReturn(mConnectionManagerWorkflow);

final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject());

final JobSyncConfig syncConfig = new JobSyncConfig()
.withSourceDockerImage(IMAGE_NAME1)
.withDestinationDockerImage(IMAGE_NAME2)
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject())
.withOperationSequence(List.of())
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());

temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID);
temporalClient.forceDeleteWorkflow(CONNECTION_ID);

verify(connectionManagerUtils).deleteWorkflowIfItExist(workflowClient, CONNECTION_ID);
Expand Down
Loading