diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java index 9ea81363c7af2..fadbeaad52072 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java @@ -24,6 +24,8 @@ import io.airbyte.config.ReplicationOutput; import io.airbyte.config.StandardSyncInput; import io.airbyte.featureflag.FeatureFlagClient; +import io.airbyte.featureflag.FieldSelectionEnabled; +import io.airbyte.featureflag.Workspace; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.metrics.lib.MetricClientFactory; import io.airbyte.metrics.lib.MetricEmittingApps; @@ -53,6 +55,7 @@ import java.nio.file.Path; import java.util.Map; import java.util.Optional; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,6 +161,13 @@ public Optional runJob() throws Exception { sourceLauncherConfig.getDockerImage()); log.info("Setting up replication worker..."); + final UUID workspaceId = syncInput.getWorkspaceId(); + // NOTE: we apply field selection if the feature flag client says so (recommended) or the old + // environment-variable flags say so (deprecated). + // The latter FeatureFlagHelper will be removed once the flag client is fully deployed. + final boolean fieldSelectionEnabled = workspaceId != null && + (featureFlagClient.enabled(FieldSelectionEnabled.INSTANCE, new Workspace(workspaceId)) + || FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId)); final var replicationWorker = new DefaultReplicationWorker( jobRunConfig.getJobId(), Math.toIntExact(jobRunConfig.getAttemptId()), @@ -173,7 +183,7 @@ public Optional runJob() throws Exception { new RecordSchemaValidator(featureFlagClient, syncInput.getWorkspaceId(), WorkerUtils.mapStreamNamesToSchemas(syncInput)), metricReporter, new ConnectorConfigUpdater(sourceApi, destinationApi), - FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, syncInput.getWorkspaceId())); + fieldSelectionEnabled); log.info("Running replication worker..."); final var jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(), diff --git a/airbyte-featureflag/src/main/kotlin/Flags.kt b/airbyte-featureflag/src/main/kotlin/Flags.kt index aed1ec2bb7638..cf40fc097f469 100644 --- a/airbyte-featureflag/src/main/kotlin/Flags.kt +++ b/airbyte-featureflag/src/main/kotlin/Flags.kt @@ -14,10 +14,14 @@ object LogConnectorMessages : EnvVar(envVar = "LOG_CONNECTOR_MESSAGES") object StreamCapableState : EnvVar(envVar = "USE_STREAM_CAPABLE_STATE") object AutoDetectSchema : EnvVar(envVar = "AUTO_DETECT_SCHEMA") object NeedStateValidation : EnvVar(envVar = "NEED_STATE_VALIDATION") +// NOTE: this is deprecated in favor of FieldSelectionEnabled and will be removed once that flag is fully deployed. object ApplyFieldSelection : EnvVar(envVar = "APPLY_FIELD_SELECTION") object PerfBackgroundJsonValidation : Temporary(key = "performance.backgroundJsonSchemaValidation") +object FieldSelectionEnabled : Temporary(key="connection.columnSelection") + +// NOTE: this is deprecated in favor of FieldSelectionEnabled and will be removed once that flag is fully deployed. object FieldSelectionWorkspaces : EnvVar(envVar = "FIELD_SELECTION_WORKSPACES") { override fun enabled(ctx: Context): Boolean { val enabledWorkspaceIds: List = fetcher(key) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 739fd8ae58478..4a5a1224c8b58 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -39,6 +39,8 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; import io.airbyte.featureflag.FeatureFlagClient; +import io.airbyte.featureflag.FieldSelectionEnabled; +import io.airbyte.featureflag.Workspace; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.MetricClient; @@ -313,6 +315,14 @@ private CheckedSupplier, Exception> final MetricClient metricClient = MetricClientFactory.getMetricClient(); final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage()); + final UUID workspaceId = syncInput.getWorkspaceId(); + // NOTE: we apply field selection if the feature flag client says so (recommended) or the old + // environment-variable flags say so (deprecated). + // The latter FeatureFlagHelper will be removed once the flag client is fully deployed. + final boolean fieldSelectionEnabled = workspaceId != null && + (featureFlagClient.enabled(FieldSelectionEnabled.INSTANCE, new Workspace(workspaceId)) + || FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId)); + return new DefaultReplicationWorker( jobRunConfig.getJobId(), Math.toIntExact(jobRunConfig.getAttemptId()), @@ -329,7 +339,7 @@ private CheckedSupplier, Exception> new RecordSchemaValidator(featureFlagClient, syncInput.getWorkspaceId(), WorkerUtils.mapStreamNamesToSchemas(syncInput)), metricReporter, new ConnectorConfigUpdater(airbyteApiClient.getSourceApi(), airbyteApiClient.getDestinationApi()), - FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, syncInput.getWorkspaceId())); + fieldSelectionEnabled); }; }