diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 4abf9471c1ea..1d52311e0797 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -80,8 +80,12 @@ import io.airbyte.db.Databases; import io.airbyte.test.airbyte_test_container.AirbyteTestContainer; import io.airbyte.test.utils.PostgreSQLContainerHelper; +import io.airbyte.workers.temporal.TemporalUtils; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.File; import java.io.IOException; import java.net.Inet4Address; @@ -783,7 +787,7 @@ public void testCheckpointing() throws Exception { // now cancel it so that we freeze state! try { apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead1.getJob().getId())); - } catch (Exception e) {} + } catch (final Exception e) {} final ConnectionState connectionState = waitForConnectionState(apiClient, connectionId); @@ -1146,6 +1150,78 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception assertEquals(JobStatus.CANCELLED, resp.get().getJob().getStatus()); } + @Test + @Order(22) + public void testDeleteConnection() throws Exception { + final String connectionName = "test-connection"; + final UUID sourceId = createPostgresSource().getSourceId(); + final UUID destinationId = createDestination().getDestinationId(); + final UUID operationId = createOperation().getOperationId(); + final AirbyteCatalog catalog = discoverSourceSchema(sourceId); + final SyncMode syncMode = SyncMode.INCREMENTAL; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(syncMode) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(destinationSyncMode) + .primaryKey(List.of(List.of(COLUMN_NAME)))); + + UUID connectionId = + createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + + final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); + + // test normal deletion of connection + apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + // remove connection to avoid exception during tear down + connectionIds.remove(connectionId); + + LOGGER.info("Waiting for connection to be deleted..."); + Thread.sleep(500); + + ConnectionStatus connectionStatus = + apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus(); + assertEquals(ConnectionStatus.DEPRECATED, connectionStatus); + + // test deletion of connection when temporal workflow is in a bad state, only when using new + // scheduler + final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); + if (featureFlags.usesNewScheduler()) { + LOGGER.info("Testing connection deletion when temporal is in a terminal state"); + connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + + terminateTemporalWorkflow(connectionId); + + // we should still be able to delete the connection when the temporal workflow is in this state + apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + LOGGER.info("Waiting for connection to be deleted..."); + Thread.sleep(500); + + connectionStatus = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus(); + assertEquals(ConnectionStatus.DEPRECATED, connectionStatus); + } + } + + private void terminateTemporalWorkflow(final UUID connectionId) { + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); + final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); + + // check if temporal workflow is reachable + final ConnectionManagerWorkflow connectionManagerWorkflow = + workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId); + connectionManagerWorkflow.getState(); + + // Terminate workflow + LOGGER.info("Terminating temporal workflow..."); + workflowCLient.newUntypedWorkflowStub("connection_manager_" + connectionId).terminate(""); + + // remove connection to avoid exception during tear down + connectionIds.remove(connectionId); + } + private AirbyteCatalog discoverSourceSchema(final UUID sourceId) throws ApiException { return apiClient.getSourceApi().discoverSchemaForSource(new SourceDiscoverSchemaRequestBody().sourceId(sourceId)).getCatalog(); }