Skip to content

Commit

Permalink
Add acceptance test for deleting connetion (#11563)
Browse files Browse the repository at this point in the history
* Add acceptance test for deleting connetion in bad temporal state

* disable new test on kube

* try using different temproalHost

* add log info line to give time for temporal client to spin up

* try avoiding missing row in temporal db

* check if temporal workflow is reachable

* check if temporal workflow is reachable

* check if temporal workflow is reachable

* try waiting for connection state

* try using airbyte-temporal hostname

* Revert "try using airbyte-temporal hostname"

This reverts commit 0e53a27.

* Revert back to using localhost

* Add 5 second wait

* only enable test for new scheduler

* only enable test for new scheduler 2

* refactor test to cover normal and unexpected temporal state
  • Loading branch information
terencecho authored Apr 4, 2022
1 parent 2d27774 commit 568f112
Showing 1 changed file with 77 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@
import io.airbyte.db.Databases;
import io.airbyte.test.airbyte_test_container.AirbyteTestContainer;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import io.airbyte.workers.temporal.TemporalUtils;
import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.File;
import java.io.IOException;
import java.net.Inet4Address;
Expand Down Expand Up @@ -783,7 +787,7 @@ public void testCheckpointing() throws Exception {
// now cancel it so that we freeze state!
try {
apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead1.getJob().getId()));
} catch (Exception e) {}
} catch (final Exception e) {}

final ConnectionState connectionState = waitForConnectionState(apiClient, connectionId);

Expand Down Expand Up @@ -1146,6 +1150,78 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception
assertEquals(JobStatus.CANCELLED, resp.get().getJob().getStatus());
}

@Test
@Order(22)
public void testDeleteConnection() throws Exception {
final String connectionName = "test-connection";
final UUID sourceId = createPostgresSource().getSourceId();
final UUID destinationId = createDestination().getDestinationId();
final UUID operationId = createOperation().getOperationId();
final AirbyteCatalog catalog = discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.INCREMENTAL;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP;
catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(syncMode)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode)
.primaryKey(List.of(List.of(COLUMN_NAME))));

UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING));

// test normal deletion of connection
apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId));

// remove connection to avoid exception during tear down
connectionIds.remove(connectionId);

LOGGER.info("Waiting for connection to be deleted...");
Thread.sleep(500);

ConnectionStatus connectionStatus =
apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus();
assertEquals(ConnectionStatus.DEPRECATED, connectionStatus);

// test deletion of connection when temporal workflow is in a bad state, only when using new
// scheduler
final FeatureFlags featureFlags = new EnvVariableFeatureFlags();
if (featureFlags.usesNewScheduler()) {
LOGGER.info("Testing connection deletion when temporal is in a terminal state");
connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

terminateTemporalWorkflow(connectionId);

// we should still be able to delete the connection when the temporal workflow is in this state
apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId));

LOGGER.info("Waiting for connection to be deleted...");
Thread.sleep(500);

connectionStatus = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus();
assertEquals(ConnectionStatus.DEPRECATED, connectionStatus);
}
}

private void terminateTemporalWorkflow(final UUID connectionId) {
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233");
final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService);

// check if temporal workflow is reachable
final ConnectionManagerWorkflow connectionManagerWorkflow =
workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId);
connectionManagerWorkflow.getState();

// Terminate workflow
LOGGER.info("Terminating temporal workflow...");
workflowCLient.newUntypedWorkflowStub("connection_manager_" + connectionId).terminate("");

// remove connection to avoid exception during tear down
connectionIds.remove(connectionId);
}

private AirbyteCatalog discoverSourceSchema(final UUID sourceId) throws ApiException {
return apiClient.getSourceApi().discoverSchemaForSource(new SourceDiscoverSchemaRequestBody().sourceId(sourceId)).getCatalog();
}
Expand Down

0 comments on commit 568f112

Please sign in to comment.