Skip to content

Commit

Permalink
Suggested Streams via Actor Definition (#21577)
Browse files Browse the repository at this point in the history
* Suggested Streams in in the Actor Definition

* Fix steam addition

* fix tests

* enable faker streams

* test resilaince.

* lint

* fig configLookup

* fixup definition load in webBackend

* fix build/tests

* Include 'suggested' in discover API response

* fix test

* Update airbyte-api/src/main/openapi/config.yaml

Co-authored-by: Pedro S. Lopez <[email protected]>

* update build with typo

* fix test

* remove comment

* add more context

---------

Co-authored-by: Pedro S. Lopez <[email protected]>
  • Loading branch information
evantahler and pedroslopez authored Jan 27, 2023
1 parent 908a93f commit b9e3c8a
Show file tree
Hide file tree
Showing 17 changed files with 234 additions and 74 deletions.
15 changes: 9 additions & 6 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,7 @@ paths:
description: |
Apply a patch-style update to a connection. Only fields present on the update request body will be updated.
Note that if a catalog is present in the request body, the connection's entire catalog will be replaced
with the catalog from the request. This means that to modify a single stream, the entire new catalog
with the catalog from the request. This means that to modify a single stream, the entire new catalog
containing the updated stream needs to be sent.
operationId: updateConnection
requestBody:
Expand Down Expand Up @@ -1883,7 +1883,7 @@ paths:
connection along with the rest of the operationIds in the request body.
Apply a patch-style update to a connection. Only fields present on the update request body will be updated.
Note that if a catalog is present in the request body, the connection's entire catalog will be replaced
with the catalog from the request. This means that to modify a single stream, the entire new catalog
with the catalog from the request. This means that to modify a single stream, the entire new catalog
containing the updated stream needs to be sent.
operationId: webBackendUpdateConnection
requestBody:
Expand Down Expand Up @@ -1952,9 +1952,9 @@ paths:
- web_backend
description: Returns all available geographies in which a data sync can run.
summary: |
Returns available geographies can be selected to run data syncs in a particular geography.
The 'auto' entry indicates that the sync will be automatically assigned to a geography according
to the platform default behavior. Entries other than 'auto' are two-letter country codes that
Returns available geographies can be selected to run data syncs in a particular geography.
The 'auto' entry indicates that the sync will be automatically assigned to a geography according
to the platform default behavior. Entries other than 'auto' are two-letter country codes that
follow the ISO 3166-1 alpha-2 standard.
operationId: webBackendListGeographies
responses:
Expand Down Expand Up @@ -3879,7 +3879,10 @@ components:
description: Alias name to the stream to be used in the destination
type: string
selected:
description: If this is true, the stream is selected with all of its properties.
description: If this is true, the stream is selected with all of its properties. For new connections, this considers if the stream is suggested or not
type: boolean
suggested:
description: Does the connector suggest that this stream be enabled by default?
type: boolean
fieldSelectionEnabled:
description: Whether field selection should be enabled. If this is true, only the properties in `selectedFields` will be included.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class BootloaderTest {

// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_MIGRATION_VERSION = "0.40.27.001";
private static final String CURRENT_MIGRATION_VERSION = "0.40.28.001";

@BeforeEach
void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,9 @@ public Optional<AirbyteCatalog> getConnectionAirbyteCatalog(final UUID connectio
return Optional.empty();
}
final ActorCatalog catalog = configRepository.getActorCatalogById(connection.getSourceCatalogId());
return Optional.of(CatalogConverter.toApi(Jsons.object(catalog.getCatalog(),
io.airbyte.protocol.models.AirbyteCatalog.class)));
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromSource(connection.getSourceId());
final io.airbyte.protocol.models.AirbyteCatalog jsonCatalog = Jsons.object(catalog.getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class);
return Optional.of(CatalogConverter.toApi(jsonCatalog, sourceDefinition));
}

public ConnectionReadList searchConnections(final ConnectionSearch connectionSearch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
connectorVersion,
new Version(sourceDef.getProtocolVersion()),
isCustomConnector);
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId);
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId, sourceDef);

if (discoverSchemaRequestBody.getConnectionId() != null) {
// modify discoveredSchema object to add CatalogDiff, containsBreakingChange, and connectionStatus
Expand All @@ -286,7 +286,7 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
.logs(new LogRead().logLines(new ArrayList<>()))
.succeeded(true);
return new SourceDiscoverSchemaRead()
.catalog(CatalogConverter.toApi(airbyteCatalog))
.catalog(CatalogConverter.toApi(airbyteCatalog, sourceDef))
.jobInfo(emptyJob)
.catalogId(currentCatalog.get().getId());
}
Expand All @@ -313,18 +313,19 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final So
new Version(
sourceDef.getProtocolVersion()),
isCustomConnector);
return retrieveDiscoveredSchema(response);
return retrieveDiscoveredSchema(response, sourceDef);
}

private SourceDiscoverSchemaRead retrieveDiscoveredSchema(final SynchronousResponse<UUID> response) throws ConfigNotFoundException, IOException {
private SourceDiscoverSchemaRead retrieveDiscoveredSchema(final SynchronousResponse<UUID> response, final StandardSourceDefinition sourceDef)
throws ConfigNotFoundException, IOException {
final SourceDiscoverSchemaRead sourceDiscoverSchemaRead = new SourceDiscoverSchemaRead()
.jobInfo(jobConverter.getSynchronousJobRead(response));

if (response.isSuccess()) {
final ActorCatalog catalog = configRepository.getActorCatalogById(response.getOutput());
final AirbyteCatalog persistenceCatalog = Jsons.object(catalog.getCatalog(),
io.airbyte.protocol.models.AirbyteCatalog.class);
sourceDiscoverSchemaRead.catalog(CatalogConverter.toApi(persistenceCatalog));
sourceDiscoverSchemaRead.catalog(CatalogConverter.toApi(persistenceCatalog, sourceDef));
sourceDiscoverSchemaRead.catalogId(response.getOutput());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.airbyte.commons.server.scheduler.EventRunner;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
Expand Down Expand Up @@ -544,8 +545,9 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
if (mostRecentActorCatalog.isPresent()) {
final io.airbyte.protocol.models.AirbyteCatalog mostRecentAirbyteCatalog =
Jsons.object(mostRecentActorCatalog.get().getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class);
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromSource(originalConnectionRead.getSourceId());
final CatalogDiff catalogDiff =
connectionsHandler.getDiff(newAirbyteCatalog, CatalogConverter.toApi(mostRecentAirbyteCatalog),
connectionsHandler.getDiff(newAirbyteCatalog, CatalogConverter.toApi(mostRecentAirbyteCatalog, sourceDefinition),
CatalogConverter.toConfiguredProtocol(newAirbyteCatalog));
breakingChange = containsBreakingChange(catalogDiff);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.text.Names;
import io.airbyte.config.FieldSelectionData;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.validation.json.JsonValidationException;
import java.util.ArrayList;
Expand Down Expand Up @@ -103,29 +104,57 @@ private static io.airbyte.protocol.models.AirbyteStream toConfiguredProtocol(fin
.withNamespace(stream.getNamespace());
}

public static io.airbyte.api.model.generated.AirbyteCatalog toApi(final io.airbyte.protocol.models.AirbyteCatalog catalog) {
public static io.airbyte.api.model.generated.AirbyteCatalog toApi(final io.airbyte.protocol.models.AirbyteCatalog catalog,
StandardSourceDefinition sourceDefinition) {
List<String> suggestedStreams = new ArrayList<>();
Boolean suggestingStreams;

// There are occasions in tests where we have not seeded the sourceDefinition fully. This is to
// prevent those tests from failing
if (sourceDefinition != null) {
suggestingStreams = sourceDefinition.getSuggestedStreams() != null;
if (suggestingStreams) {
suggestedStreams.addAll(sourceDefinition.getSuggestedStreams().getStreams());
}
} else {
suggestingStreams = false;
}

return new io.airbyte.api.model.generated.AirbyteCatalog()
.streams(catalog.getStreams()
.stream()
.map(CatalogConverter::toApi)
.map(s -> new io.airbyte.api.model.generated.AirbyteStreamAndConfiguration()
.stream(s)
.config(generateDefaultConfiguration(s)))
.config(generateDefaultConfiguration(s, suggestingStreams, suggestedStreams)))
.collect(Collectors.toList()));
}

private static io.airbyte.api.model.generated.AirbyteStreamConfiguration generateDefaultConfiguration(final io.airbyte.api.model.generated.AirbyteStream stream) {
private static io.airbyte.api.model.generated.AirbyteStreamConfiguration generateDefaultConfiguration(final io.airbyte.api.model.generated.AirbyteStream stream,
Boolean suggestingStreams,
List<String> suggestedStreams) {
final io.airbyte.api.model.generated.AirbyteStreamConfiguration result = new io.airbyte.api.model.generated.AirbyteStreamConfiguration()
.aliasName(Names.toAlphanumericAndUnderscore(stream.getName()))
.cursorField(stream.getDefaultCursorField())
.destinationSyncMode(io.airbyte.api.model.generated.DestinationSyncMode.APPEND)
.primaryKey(stream.getSourceDefinedPrimaryKey())
.selected(true);
.selected(!suggestingStreams)
.suggested(true);

if (suggestingStreams) {
if (suggestedStreams.contains(stream.getName())) {
result.setSelected(true);
} else {
result.setSuggested(false);
}
}

if (stream.getSupportedSyncModes().size() > 0) {
result.setSyncMode(stream.getSupportedSyncModes().get(0));
} else {
result.setSyncMode(io.airbyte.api.model.generated.SyncMode.INCREMENTAL);
}

return result;
}

Expand Down
Loading

0 comments on commit b9e3c8a

Please sign in to comment.