From 568f11242db9bbbeb519d2dddcc28036f0b2a3ff Mon Sep 17 00:00:00 2001 From: terencecho Date: Mon, 4 Apr 2022 10:51:03 -0700 Subject: [PATCH] Add acceptance test for deleting connetion (#11563) * Add acceptance test for deleting connetion in bad temporal state * disable new test on kube * try using different temproalHost * add log info line to give time for temporal client to spin up * try avoiding missing row in temporal db * check if temporal workflow is reachable * check if temporal workflow is reachable * check if temporal workflow is reachable * try waiting for connection state * try using airbyte-temporal hostname * Revert "try using airbyte-temporal hostname" This reverts commit 0e53a27622262c04aac45f518578dc3068edf625. * Revert back to using localhost * Add 5 second wait * only enable test for new scheduler * only enable test for new scheduler 2 * refactor test to cover normal and unexpected temporal state --- .../test/acceptance/AcceptanceTests.java | 78 ++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 4abf9471c1ea3..1d52311e0797a 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -80,8 +80,12 @@ import io.airbyte.db.Databases; import io.airbyte.test.airbyte_test_container.AirbyteTestContainer; import io.airbyte.test.utils.PostgreSQLContainerHelper; +import io.airbyte.workers.temporal.TemporalUtils; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.File; import java.io.IOException; import java.net.Inet4Address; @@ -783,7 +787,7 @@ public void testCheckpointing() throws Exception { // now cancel it so that we freeze state! try { apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead1.getJob().getId())); - } catch (Exception e) {} + } catch (final Exception e) {} final ConnectionState connectionState = waitForConnectionState(apiClient, connectionId); @@ -1146,6 +1150,78 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception assertEquals(JobStatus.CANCELLED, resp.get().getJob().getStatus()); } + @Test + @Order(22) + public void testDeleteConnection() throws Exception { + final String connectionName = "test-connection"; + final UUID sourceId = createPostgresSource().getSourceId(); + final UUID destinationId = createDestination().getDestinationId(); + final UUID operationId = createOperation().getOperationId(); + final AirbyteCatalog catalog = discoverSourceSchema(sourceId); + final SyncMode syncMode = SyncMode.INCREMENTAL; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(syncMode) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(destinationSyncMode) + .primaryKey(List.of(List.of(COLUMN_NAME)))); + + UUID connectionId = + createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + + final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); + + // test normal deletion of connection + apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + // remove connection to avoid exception during tear down + connectionIds.remove(connectionId); + + LOGGER.info("Waiting for connection to be deleted..."); + Thread.sleep(500); + + ConnectionStatus connectionStatus = + apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus(); + assertEquals(ConnectionStatus.DEPRECATED, connectionStatus); + + // test deletion of connection when temporal workflow is in a bad state, only when using new + // scheduler + final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); + if (featureFlags.usesNewScheduler()) { + LOGGER.info("Testing connection deletion when temporal is in a terminal state"); + connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + + terminateTemporalWorkflow(connectionId); + + // we should still be able to delete the connection when the temporal workflow is in this state + apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + + LOGGER.info("Waiting for connection to be deleted..."); + Thread.sleep(500); + + connectionStatus = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus(); + assertEquals(ConnectionStatus.DEPRECATED, connectionStatus); + } + } + + private void terminateTemporalWorkflow(final UUID connectionId) { + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233"); + final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService); + + // check if temporal workflow is reachable + final ConnectionManagerWorkflow connectionManagerWorkflow = + workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId); + connectionManagerWorkflow.getState(); + + // Terminate workflow + LOGGER.info("Terminating temporal workflow..."); + workflowCLient.newUntypedWorkflowStub("connection_manager_" + connectionId).terminate(""); + + // remove connection to avoid exception during tear down + connectionIds.remove(connectionId); + } + private AirbyteCatalog discoverSourceSchema(final UUID sourceId) throws ApiException { return apiClient.getSourceApi().discoverSchemaForSource(new SourceDiscoverSchemaRequestBody().sourceId(sourceId)).getCatalog(); }