Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return whether configuration was updated as part of api response #21466

Merged
merged 12 commits into from
Jan 18, 2023
6 changes: 6 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4183,6 +4183,7 @@ components:
- createdAt
- endedAt
- succeeded
- didUpdateConfiguration
properties:
id:
type: string
Expand All @@ -4200,6 +4201,9 @@ components:
format: int64
succeeded:
type: boolean
didUpdateConfiguration:
type: boolean
default: false
logs:
$ref: "#/components/schemas/LogRead"
Pagination:
Expand Down Expand Up @@ -4231,6 +4235,8 @@ components:
- failed
message:
type: string
didUpdateConfiguration:
type: boolean
jobInfo:
$ref: "#/components/schemas/SynchronousJobRead"
ConnectionState:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardSyncInput;
Expand All @@ -17,6 +18,7 @@
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.protocol.models.Config;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.helper.FailureHelper.ConnectorCommand;
Expand Down Expand Up @@ -140,6 +142,12 @@ private static Optional<AirbyteTraceMessage> getTraceMessageFromMessagesByType(f
.findFirst();
}

public static Boolean getDidControlMessageChangeConfig(final JsonNode initialConfigJson, final AirbyteControlConnectorConfigMessage configMessage) {
final Config newConfig = configMessage.getConfig();
final JsonNode newConfigJson = Jsons.jsonNode(newConfig);
return !initialConfigJson.equals(newConfigJson);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is OK, but calling out that this would would fail if the config JSON was re-ordered by the connector. However, the connector shouldn't just re-order it's config without a good reason...

Actually, jackson.equals() should be positionally independant!

}

public static Map<Type, List<AirbyteMessage>> getMessagesByType(final Process process, final AirbyteStreamFactory streamFactory, final int timeOut)
throws IOException {
final Map<Type, List<AirbyteMessage>> messagesByType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.LineGobbler;
Expand Down Expand Up @@ -64,13 +65,16 @@ public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLaunche
public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Path jobRoot) throws WorkerException {
LineGobbler.startSection("CHECK");
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ROOT_KEY, jobRoot));

try {
final JsonNode inputConfig = input.getConnectionConfiguration();
process = integrationLauncher.check(
jobRoot,
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(input.getConnectionConfiguration()));
Jsons.serialize(inputConfig));

final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput()
.withOutputType(OutputType.CHECK_CONNECTION);

LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

Expand All @@ -82,17 +86,17 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa

if (input.getActorId() != null && input.getActorType() != null) {
final Optional<AirbyteControlConnectorConfigMessage> optionalConfigMsg = WorkerUtils.getMostRecentConfigControlMessage(messagesByType);
optionalConfigMsg.ifPresent(
configMessage -> {
switch (input.getActorType()) {
case SOURCE -> connectorConfigUpdater.updateSource(
input.getActorId(),
configMessage.getConfig());
case DESTINATION -> connectorConfigUpdater.updateDestination(
input.getActorId(),
configMessage.getConfig());
}
});
if (optionalConfigMsg.isPresent() && WorkerUtils.getDidControlMessageChangeConfig(inputConfig, optionalConfigMsg.get())) {
switch (input.getActorType()) {
case SOURCE -> connectorConfigUpdater.updateSource(
input.getActorId(),
optionalConfigMsg.get().getConfig());
case DESTINATION -> connectorConfigUpdater.updateDestination(
input.getActorId(),
optionalConfigMsg.get().getConfig());
}
jobOutput.setDidUpdateConfiguration(true);
}
}

final Optional<FailureReason> failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -72,12 +73,15 @@ public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) throws WorkerException {
ApmTraceUtils.addTagsToTrace(generateTraceTags(discoverSchemaInput, jobRoot));
try {
final JsonNode inputConfig = discoverSchemaInput.getConnectionConfiguration();
process = integrationLauncher.discover(
jobRoot,
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(discoverSchemaInput.getConnectionConfiguration()));
Jsons.serialize(inputConfig));

final ConnectorJobOutput jobOutput = new ConnectorJobOutput()
.withOutputType(OutputType.DISCOVER_CATALOG_ID);

final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

final Map<Type, List<AirbyteMessage>> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30);
Expand All @@ -88,10 +92,12 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
.findFirst();

final Optional<AirbyteControlConnectorConfigMessage> optionalConfigMsg = WorkerUtils.getMostRecentConfigControlMessage(messagesByType);
optionalConfigMsg.ifPresent(
configMessage -> connectorConfigUpdater.updateSource(
UUID.fromString(discoverSchemaInput.getSourceId()),
configMessage.getConfig()));
if (optionalConfigMsg.isPresent() && WorkerUtils.getDidControlMessageChangeConfig(inputConfig, optionalConfigMsg.get())) {
connectorConfigUpdater.updateSource(
UUID.fromString(discoverSchemaInput.getSourceId()),
optionalConfigMsg.get().getConfig());
jobOutput.setDidUpdateConfiguration(true);
}

final Optional<FailureReason> failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.DISCOVER_CATALOG_ID, messagesByType);
failureReason.ifPresent(jobOutput::setFailureReason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ properties:
format: uuid
spec:
existingJavaType: io.airbyte.protocol.models.ConnectorSpecification
didUpdateConfiguration:
description: A boolean indicating whether the configuration was updated during the job, e.g. if an AirbyteConfigControlMessage was received.
type: boolean
default: false
failureReason:
"$ref": FailureReason.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ public SynchronousJobRead getSynchronousJobRead(final SynchronousJobMetadata met
.createdAt(metadata.getCreatedAt())
.endedAt(metadata.getEndedAt())
.succeeded(metadata.isSucceeded())
.didUpdateConfiguration(metadata.getDidUpdateConfiguration())
.logs(getLogRead(metadata.getLogPath()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,18 @@ public SynchronousResponse<ConnectorSpecification> createGetSpecJob(final String
}

@VisibleForTesting
<T, U> SynchronousResponse<T> execute(final ConfigType configType,
final ConnectorJobReportingContext jobContext,
@Nullable final UUID connectorDefinitionId,
final Supplier<TemporalResponse<U>> executor,
final Function<U, T> outputMapper,
final UUID workspaceId) {
<T> SynchronousResponse<T> execute(final ConfigType configType,
final ConnectorJobReportingContext jobContext,
@Nullable final UUID connectorDefinitionId,
final Supplier<TemporalResponse<ConnectorJobOutput>> executor,
final Function<ConnectorJobOutput, T> outputMapper,
final UUID workspaceId) {
final long createdAt = Instant.now().toEpochMilli();
final UUID jobId = jobContext.jobId();
try {
track(jobId, configType, connectorDefinitionId, workspaceId, JobState.STARTED, null);
final TemporalResponse<U> temporalResponse = executor.get();
final Optional<U> jobOutput = temporalResponse.getOutput();
final TemporalResponse<ConnectorJobOutput> temporalResponse = executor.get();
final Optional<ConnectorJobOutput> jobOutput = temporalResponse.getOutput();
Comment on lines -183 to +184
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed some generics since they weren't necessary (each job returns a ConnectorJobOutput) and were making it harder to work with

final T mappedOutput = jobOutput.map(outputMapper).orElse(null);
final JobState outputState = temporalResponse.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED;

Expand All @@ -194,6 +194,7 @@ <T, U> SynchronousResponse<T> execute(final ConfigType configType,
final long endedAt = Instant.now().toEpochMilli();
return SynchronousResponse.fromTemporalResponse(
temporalResponse,
jobOutput.orElse(null),
mappedOutput,
jobId,
configType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ public class SynchronousJobMetadata {
private final long createdAt;
private final long endedAt;
private final boolean succeeded;
private final boolean didUpdateConfiguration;

private final Path logPath;

public static SynchronousJobMetadata fromJobMetadata(final JobMetadata jobMetadata,
final UUID id,
final ConfigType configType,
final UUID configId,
final boolean didUpdateConfiguration,
final long createdAt,
final long endedAt) {
return new SynchronousJobMetadata(
Expand All @@ -37,6 +39,7 @@ public static SynchronousJobMetadata fromJobMetadata(final JobMetadata jobMetada
createdAt,
endedAt,
jobMetadata.isSucceeded(),
didUpdateConfiguration,
jobMetadata.getLogPath());
}

Expand All @@ -46,13 +49,15 @@ public SynchronousJobMetadata(final UUID id,
final long createdAt,
final long endedAt,
final boolean succeeded,
final boolean didUpdateConfiguration,
final Path logPath) {
this.id = id;
this.configType = configType;
this.configId = configId;
this.createdAt = createdAt;
this.endedAt = endedAt;
this.succeeded = succeeded;
this.didUpdateConfiguration = didUpdateConfiguration;
this.logPath = logPath;
}

Expand Down Expand Up @@ -80,6 +85,10 @@ public boolean isSucceeded() {
return succeeded;
}

public boolean getDidUpdateConfiguration() {
return didUpdateConfiguration;
}

public Path getLogPath() {
return logPath;
}
Expand All @@ -93,13 +102,14 @@ public boolean equals(final Object o) {
return false;
}
final SynchronousJobMetadata that = (SynchronousJobMetadata) o;
return createdAt == that.createdAt && endedAt == that.endedAt && succeeded == that.succeeded && Objects.equals(id, that.id)
return createdAt == that.createdAt && endedAt == that.endedAt && succeeded == that.succeeded
&& didUpdateConfiguration == that.didUpdateConfiguration && Objects.equals(id, that.id)
&& configType == that.configType && Objects.equals(configId, that.configId) && Objects.equals(logPath, that.logPath);
}

@Override
public int hashCode() {
return Objects.hash(id, configType, configId, createdAt, endedAt, succeeded, logPath);
return Objects.hash(id, configType, configId, createdAt, endedAt, succeeded, didUpdateConfiguration, logPath);
}

@Override
Expand All @@ -111,6 +121,7 @@ public String toString() {
", createdAt=" + createdAt +
", endedAt=" + endedAt +
", succeeded=" + succeeded +
", didUpdateConfiguration=" + didUpdateConfiguration +
", logPath=" + logPath +
'}';
}
Expand All @@ -119,6 +130,7 @@ public static SynchronousJobMetadata mock(final ConfigType configType) {
final long now = Instant.now().toEpochMilli();
final UUID configId = null;
final boolean succeeded = true;
final boolean didUpdateConfiguration = false;
final Path logPath = null;

return new SynchronousJobMetadata(
Expand All @@ -128,6 +140,7 @@ public static SynchronousJobMetadata mock(final ConfigType configType) {
now,
now,
succeeded,
didUpdateConfiguration,
logPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package io.airbyte.server.scheduler;

import io.airbyte.commons.temporal.TemporalResponse;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.JobConfig.ConfigType;
import java.util.Objects;
import java.util.UUID;
import javax.annotation.Nullable;

public class SynchronousResponse<T> {

Expand All @@ -23,7 +25,8 @@ public static <T> SynchronousResponse<T> success(final T output, final Synchrono
}

public static <T, U> SynchronousResponse<T> fromTemporalResponse(final TemporalResponse<U> temporalResponse,
final T output,
@Nullable final ConnectorJobOutput jobOutput,
@Nullable final T responseOutput,
final UUID id,
final ConfigType configType,
final UUID configId,
Expand All @@ -35,9 +38,10 @@ public static <T, U> SynchronousResponse<T> fromTemporalResponse(final TemporalR
id,
configType,
configId,
jobOutput != null ? jobOutput.getDidUpdateConfiguration() : false,
createdAt,
endedAt);
return new SynchronousResponse<>(output, metadata);
return new SynchronousResponse<>(responseOutput, metadata);
}

public SynchronousResponse(final T output, final SynchronousJobMetadata metadata) {
Expand Down
Loading