diff --git a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java index 8327144742a18..9ce6cd9336026 100644 --- a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java +++ b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java @@ -28,20 +28,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; import io.airbyte.commons.json.Jsons; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.commons.lang3.StringUtils; public class CatalogHelpers { @@ -143,80 +138,4 @@ protected static Set getAllFieldNames(JsonNode node) { return allFieldNames; } - /** - * @param identifier stream name or field name - * @return if the identifier matches the alphanumeric+underscore requirement for identifiers - */ - public static boolean isValidIdentifier(String identifier) { - // todo (cgardens) - remove $ once mailchimp is fixed. - final String s = identifier.replaceAll("[-_.$]", ""); - return StringUtils.isAlphanumeric(s); - } - - /** - * @param catalog airbyte catalog - * @return list of stream names in the catalog that are invalid - */ - public static List getInvalidStreamNames(AirbyteCatalog catalog) { - return getInvalidStreamNames(catalog.getStreams().stream().map(AirbyteStream::getName)); - } - - /** - * @param catalog configured airbyte catalog - * @return list of stream names in the catalog that are invalid - */ - public static List getInvalidStreamNames(ConfiguredAirbyteCatalog catalog) { - return getInvalidStreamNames(catalog.getStreams().stream().map(ConfiguredAirbyteStream::getStream).map(AirbyteStream::getName)); - } - - private static List getInvalidStreamNames(Stream names) { - return names - .filter(streamName -> !isValidIdentifier(streamName)) - .collect(Collectors.toList()); - } - - /** - * @param catalog airbyte catalog - * @return multimap of stream names to all invalid field names in that stream - */ - public static Multimap getInvalidFieldNames(AirbyteCatalog catalog) { - return getInvalidFieldNames(getStreamNameToJsonSchema(catalog)); - } - - /** - * @param catalog configured airbyte catalog - * @return multimap of stream names to all invalid field names in that stream - */ - public static Multimap getInvalidFieldNames(ConfiguredAirbyteCatalog catalog) { - return getInvalidFieldNames(getStreamNameToJsonSchema(catalog)); - } - - private static Map getStreamNameToJsonSchema(AirbyteCatalog catalog) { - return catalog.getStreams() - .stream() - .collect(Collectors.toMap(AirbyteStream::getName, AirbyteStream::getJsonSchema)); - } - - private static Map getStreamNameToJsonSchema(ConfiguredAirbyteCatalog catalog) { - return catalog.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .collect(Collectors.toMap(AirbyteStream::getName, AirbyteStream::getJsonSchema)); - } - - private static Multimap getInvalidFieldNames(Map streamNameToJsonSchema) { - final Multimap streamNameToInvalidFieldNames = Multimaps.newSetMultimap(new HashMap<>(), HashSet::new); - - for (final Map.Entry entry : streamNameToJsonSchema.entrySet()) { - final Set invalidFieldNames = getAllFieldNames(entry.getValue()) - .stream() - .filter(streamName -> !isValidIdentifier(streamName)) - .collect(Collectors.toSet()); - - streamNameToInvalidFieldNames.putAll(entry.getKey(), invalidFieldNames); - } - - return streamNameToInvalidFieldNames; - } - } diff --git a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java index 74a157137da3e..4153a852078ad 100644 --- a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java +++ b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java @@ -25,21 +25,14 @@ package io.airbyte.protocol.models; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertIterableEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; import java.io.IOException; -import java.util.Collections; -import java.util.List; import java.util.Set; import org.junit.jupiter.api.Test; @@ -62,46 +55,6 @@ void testGetTopLevelFieldNames() { assertEquals(Sets.newHashSet("name"), actualFieldNames); } - @Test - void testValidIdentifiers() { - assertTrue(CatalogHelpers.isValidIdentifier("identifier_name")); - assertTrue(CatalogHelpers.isValidIdentifier("iDenTiFieR_name")); - assertTrue(CatalogHelpers.isValidIdentifier("__identifier_name")); - assertTrue(CatalogHelpers.isValidIdentifier("IDENTIFIER_NAME")); - assertTrue(CatalogHelpers.isValidIdentifier("123identifier_name")); - assertTrue(CatalogHelpers.isValidIdentifier("i0d0e0n0t0i0f0i0e0r0n0a0m0e")); - assertTrue(CatalogHelpers.isValidIdentifier("identifiêr")); - assertTrue(CatalogHelpers.isValidIdentifier("a_unicode_name_文")); - assertTrue(CatalogHelpers.isValidIdentifier("identifier__name__")); - assertTrue(CatalogHelpers.isValidIdentifier("identifier-name.weee")); - } - - @Test - void testInvalidIdentifiers() { - assertFalse(CatalogHelpers.isValidIdentifier("\"identifier name")); - assertFalse(CatalogHelpers.isValidIdentifier("identifier name")); - assertFalse(CatalogHelpers.isValidIdentifier("identifier%")); - assertFalse(CatalogHelpers.isValidIdentifier("`identifier`")); - assertFalse(CatalogHelpers.isValidIdentifier("'identifier'")); - } - - @Test - void testGetInvalidStreamNames() { - final String validStreamName = "Valid_Stream"; - final AirbyteStream validStream = new AirbyteStream(); - validStream.setName(validStreamName); - - final String invalidStreamName = "invalid stream"; - AirbyteStream invalidStream = new AirbyteStream(); - invalidStream.setName(invalidStreamName); - - AirbyteCatalog catalog = new AirbyteCatalog(); - catalog.setStreams(List.of(validStream, invalidStream)); - - List invalidStreamNames = CatalogHelpers.getInvalidStreamNames(catalog); - assertIterableEquals(Collections.singleton(invalidStreamName), invalidStreamNames); - } - @Test void testGetFieldNames() throws IOException { JsonNode node = Jsons.deserialize(MoreResources.readResource("valid_schema.json")); @@ -111,26 +64,4 @@ void testGetFieldNames() throws IOException { assertEquals(expectedFieldNames, actualFieldNames); } - @Test - void testGetInvalidFieldNames() throws IOException { - final String validStreamName = "Valid_Stream"; - final AirbyteStream validStream = new AirbyteStream(); - validStream.setName(validStreamName); - JsonNode validSchema = Jsons.deserialize(MoreResources.readResource("valid_schema.json")); - validStream.setJsonSchema(validSchema); - - final String invalidStreamName = "invalid stream"; - AirbyteStream invalidStream = new AirbyteStream(); - invalidStream.setName(invalidStreamName); - JsonNode invalidSchema = Jsons.deserialize(MoreResources.readResource("invalid_schema.json")); - invalidStream.setJsonSchema(invalidSchema); - - AirbyteCatalog catalog = new AirbyteCatalog(); - catalog.setStreams(List.of(validStream, invalidStream)); - - Multimap streamNameToInvalidFieldNames = CatalogHelpers.getInvalidFieldNames(catalog); - assertIterableEquals(Collections.singleton(invalidStreamName), streamNameToInvalidFieldNames.keySet()); - assertIterableEquals(ImmutableList.of("\"CZK", "C A D"), streamNameToInvalidFieldNames.get(invalidStreamName)); - } - } diff --git a/airbyte-protocol/models/src/test/resources/invalid_schema.json b/airbyte-protocol/models/src/test/resources/invalid_schema.json deleted file mode 100644 index 198cf219e2f38..0000000000000 --- a/airbyte-protocol/models/src/test/resources/invalid_schema.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "type": "object", - "properties": { - "date": { "type": "string", "format": "date-time" }, - "C A D": { "type": ["null", "number"] }, - "HKD": { "type": ["null", "number"] }, - "ISK": { "type": ["null", "number"] }, - "PHP": { "type": ["null", "number"] }, - "DKK": { "type": ["null", "number"] }, - "HUF": { "type": ["null", "number"] }, - "\"CZK": { "type": ["null", "number"] } - } -} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java index e6dba4f4f4b66..b9cf17151b079 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java @@ -25,7 +25,6 @@ package io.airbyte.workers; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.Multimap; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; @@ -34,13 +33,11 @@ import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.workers.process.IntegrationLauncher; import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory; import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory; import java.io.InputStream; import java.nio.file.Path; -import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -94,20 +91,6 @@ public OutputAndStatus run(final StandardDiscover return new OutputAndStatus<>(JobStatus.FAILED); } - List invalidStreamNames = CatalogHelpers.getInvalidStreamNames(catalog.get()); - - if (!invalidStreamNames.isEmpty()) { - invalidStreamNames.forEach(streamName -> LOGGER.error("Cannot sync invalid stream name: " + streamName)); - return new OutputAndStatus<>(JobStatus.FAILED); - } - - Multimap streamNameToInvalidFieldNames = CatalogHelpers.getInvalidFieldNames(catalog.get()); - if (!streamNameToInvalidFieldNames.isEmpty()) { - streamNameToInvalidFieldNames - .forEach((streamName, fieldNames) -> LOGGER.error("Cannot sync invalid field names for stream " + streamName + ": " + fieldNames)); - return new OutputAndStatus<>(JobStatus.FAILED); - } - return new OutputAndStatus<>( JobStatus.SUCCEEDED, new StandardDiscoverCatalogOutput() diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java index 2e4c12878e710..f476e03a60d5b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java @@ -24,7 +24,6 @@ package io.airbyte.workers; -import com.google.common.collect.Sets; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; @@ -32,9 +31,6 @@ import io.airbyte.config.StandardTargetConfig; import io.airbyte.config.State; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.SyncMode; import io.airbyte.workers.normalization.NormalizationRunner; import io.airbyte.workers.protocols.Destination; @@ -42,10 +38,7 @@ import io.airbyte.workers.protocols.Source; import java.nio.file.Files; import java.nio.file.Path; -import java.util.HashSet; -import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -82,9 +75,6 @@ public OutputAndStatus run(StandardSyncInput syncInput, Path .stream() .collect(Collectors.toMap(s -> s.getStream().getName(), s -> s.getSyncMode() != null ? s.getSyncMode() : SyncMode.FULL_REFRESH))); - // clean catalog object - removeInvalidStreams(syncInput.getCatalog()); - final StandardTapConfig tapConfig = WorkerUtils.syncToTapConfig(syncInput); final StandardTargetConfig targetConfig = WorkerUtils.syncToTargetConfig(syncInput); @@ -97,12 +87,8 @@ public OutputAndStatus run(StandardSyncInput syncInput, Path if (maybeMessage.isPresent()) { final AirbyteMessage message = maybeMessage.get(); - if (message.getType().equals(AirbyteMessage.Type.RECORD) && !CatalogHelpers.isValidIdentifier(message.getRecord().getStream())) { - LOGGER.error("Filtered out record for invalid stream: " + message.getRecord().getStream()); - } else { - messageTracker.accept(message); - destination.accept(message); - } + messageTracker.accept(message); + destination.accept(message); } } @@ -145,16 +131,4 @@ public void cancel() { cancelled.set(true); } - private void removeInvalidStreams(ConfiguredAirbyteCatalog catalog) { - final Set invalidStreams = Sets.union( - new HashSet<>(CatalogHelpers.getInvalidStreamNames(catalog)), - CatalogHelpers.getInvalidFieldNames(catalog).keySet()); - - final List streams = catalog.getStreams().stream() - .filter(stream -> !invalidStreams.contains(stream.getStream().getName())) - .collect(Collectors.toList()); - - catalog.setStreams(streams); - } - } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java index 71fe79b43f863..f95d1298afa7b 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java @@ -26,19 +26,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import io.airbyte.commons.json.Jsons; +import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardTapConfig; import io.airbyte.config.StandardTargetConfig; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.workers.normalization.NormalizationRunner; import io.airbyte.workers.protocols.airbyte.AirbyteDestination; import io.airbyte.workers.protocols.airbyte.AirbyteMessageTracker; @@ -46,9 +42,8 @@ import io.airbyte.workers.protocols.airbyte.AirbyteSource; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -56,17 +51,15 @@ class DefaultSyncWorkerTest { private static final Path WORKSPACE_ROOT = Path.of("workspaces/10"); private static final String STREAM_NAME = "user_preferences"; - private static final String INVALID_STREAM_NAME = "invalid stream name"; private static final String FIELD_NAME = "favorite_color"; private static final AirbyteMessage RECORD_MESSAGE1 = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue"); private static final AirbyteMessage RECORD_MESSAGE2 = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "yellow"); - private static final AirbyteMessage INVALID_RECORD_MESSAGE = AirbyteMessageUtils.createRecordMessage(INVALID_STREAM_NAME, FIELD_NAME, "yellow"); private Path jobRoot; private Path normalizationRoot; private AirbyteSource tap; private AirbyteDestination target; - private StandardSyncInput invalidSyncInput; + private StandardSyncInput syncInput; private StandardTapConfig tapConfig; private StandardTargetConfig targetConfig; private NormalizationRunner normalizationRunner; @@ -77,45 +70,26 @@ void setup() throws Exception { jobRoot = Files.createDirectories(Files.createTempDirectory("test").resolve(WORKSPACE_ROOT)); normalizationRoot = jobRoot.resolve("normalize"); - final StandardSyncInput validSyncInput = TestConfigHelpers.createSyncConfig().getValue(); + final ImmutablePair syncPair = TestConfigHelpers.createSyncConfig(); + syncInput = syncPair.getValue(); - // create sync input with invalid stream to ensure it is filtered out - invalidSyncInput = new StandardSyncInput(); - invalidSyncInput.setConnectionId(validSyncInput.getConnectionId()); - invalidSyncInput.setDestinationConnection(validSyncInput.getDestinationConnection()); - invalidSyncInput.setSourceConnection(validSyncInput.getSourceConnection()); - invalidSyncInput.setState(validSyncInput.getState()); - invalidSyncInput.setSyncMode(validSyncInput.getSyncMode()); - - final ConfiguredAirbyteStream invalidStream = new ConfiguredAirbyteStream() - .withStream(new AirbyteStream() - .withName(INVALID_STREAM_NAME) - .withJsonSchema(Jsons.deserialize("{}"))); - final List streams = new ArrayList<>(validSyncInput.getCatalog().getStreams()); - streams.add(invalidStream); - final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog(); - catalog.setStreams(streams); - invalidSyncInput.setCatalog(catalog); - - tapConfig = WorkerUtils.syncToTapConfig(validSyncInput); - targetConfig = WorkerUtils.syncToTargetConfig(validSyncInput); + tapConfig = WorkerUtils.syncToTapConfig(syncInput); + targetConfig = WorkerUtils.syncToTargetConfig(syncInput); tap = mock(AirbyteSource.class); target = mock(AirbyteDestination.class); normalizationRunner = mock(NormalizationRunner.class); - when(tap.isFinished()).thenReturn(false, false, false, false, true); - when(tap.attemptRead()).thenReturn(Optional.of(RECORD_MESSAGE1), Optional.empty(), Optional.of(RECORD_MESSAGE2), - Optional.of(INVALID_RECORD_MESSAGE)); + when(tap.isFinished()).thenReturn(false, false, false, true); + when(tap.attemptRead()).thenReturn(Optional.of(RECORD_MESSAGE1), Optional.empty(), Optional.of(RECORD_MESSAGE2)); when(normalizationRunner.normalize(normalizationRoot, targetConfig.getDestinationConnectionConfiguration(), targetConfig.getCatalog())) .thenReturn(true); } @Test void test() throws Exception { - final DefaultSyncWorker defaultSyncWorker = - new DefaultSyncWorker(tap, target, new AirbyteMessageTracker(), normalizationRunner); - final OutputAndStatus run = defaultSyncWorker.run(invalidSyncInput, jobRoot); + final DefaultSyncWorker defaultSyncWorker = new DefaultSyncWorker(tap, target, new AirbyteMessageTracker(), normalizationRunner); + final OutputAndStatus run = defaultSyncWorker.run(syncInput, jobRoot); assertEquals(JobStatus.SUCCEEDED, run.getStatus()); @@ -123,7 +97,6 @@ void test() throws Exception { verify(target).start(targetConfig, jobRoot); verify(target).accept(RECORD_MESSAGE1); verify(target).accept(RECORD_MESSAGE2); - verify(target, never()).accept(INVALID_RECORD_MESSAGE); verify(normalizationRunner).start(); verify(normalizationRunner).normalize(normalizationRoot, targetConfig.getDestinationConnectionConfiguration(), targetConfig.getCatalog()); verify(normalizationRunner).close(); diff --git a/docs/architecture/airbyte-specification.md b/docs/architecture/airbyte-specification.md index 36da3ba3ea3f7..77e28b7bb8f78 100644 --- a/docs/architecture/airbyte-specification.md +++ b/docs/architecture/airbyte-specification.md @@ -162,7 +162,7 @@ read(Config, AirbyteCatalog, State) -> Stream } ``` -**Note:** Airbyte only supports stream and field names that contain only Unicode alphabet characters, numbers, or underscores. The discovery process will fail if any stream names or field names are invalid. Syncs will filter out any stream that has an invalid name or any invalid fields. +**Note:** Airbyte only supports stream and field names can be any UTF8 string. Destinations are responsible for cleaning these names to make them valid table and column names in their respective data stores. #### Read