From 1eabbf91295bba549ba7d0f38bfc227dea773cbb Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Wed, 28 Jul 2021 16:39:52 +0200 Subject: [PATCH 01/18] Initial version of Apify Dataset source connector --- .../source-apify-dataset/.dockerignore | 7 + .../source-apify-dataset/Dockerfile | 16 ++ .../connectors/source-apify-dataset/README.md | 131 +++++++++++++++ .../acceptance-test-config.yml | 19 +++ .../acceptance-test-docker.sh | 16 ++ .../source-apify-dataset/build.gradle | 14 ++ .../integration_tests/__init__.py | 0 .../integration_tests/acceptance.py | 34 ++++ .../integration_tests/catalog.json | 13 ++ .../integration_tests/configured_catalog.json | 16 ++ .../integration_tests/invalid_config.json | 6 + .../connectors/source-apify-dataset/main.py | 33 ++++ .../source-apify-dataset/requirements.txt | 3 + .../connectors/source-apify-dataset/setup.py | 46 ++++++ .../source_apify_dataset/__init__.py | 26 +++ .../source_apify_dataset/source.py | 151 ++++++++++++++++++ .../source_apify_dataset/spec.json | 28 ++++ .../unit_tests/unit_test.py | 27 ++++ 18 files changed, 586 insertions(+) create mode 100644 airbyte-integrations/connectors/source-apify-dataset/.dockerignore create mode 100644 airbyte-integrations/connectors/source-apify-dataset/Dockerfile create mode 100644 airbyte-integrations/connectors/source-apify-dataset/README.md create mode 100644 airbyte-integrations/connectors/source-apify-dataset/acceptance-test-config.yml create mode 100755 airbyte-integrations/connectors/source-apify-dataset/acceptance-test-docker.sh create mode 100644 airbyte-integrations/connectors/source-apify-dataset/build.gradle create mode 100644 airbyte-integrations/connectors/source-apify-dataset/integration_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-apify-dataset/integration_tests/acceptance.py create mode 100644 airbyte-integrations/connectors/source-apify-dataset/integration_tests/catalog.json create mode 100644 airbyte-integrations/connectors/source-apify-dataset/integration_tests/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-apify-dataset/integration_tests/invalid_config.json create mode 100644 airbyte-integrations/connectors/source-apify-dataset/main.py create mode 100644 airbyte-integrations/connectors/source-apify-dataset/requirements.txt create mode 100644 airbyte-integrations/connectors/source-apify-dataset/setup.py create mode 100644 airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/__init__.py create mode 100644 airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py create mode 100644 airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json create mode 100644 airbyte-integrations/connectors/source-apify-dataset/unit_tests/unit_test.py diff --git a/airbyte-integrations/connectors/source-apify-dataset/.dockerignore b/airbyte-integrations/connectors/source-apify-dataset/.dockerignore new file mode 100644 index 0000000000000..20d9889b51a04 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/.dockerignore @@ -0,0 +1,7 @@ +* +!Dockerfile +!Dockerfile.test +!main.py +!source_apify_dataset +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-apify-dataset/Dockerfile b/airbyte-integrations/connectors/source-apify-dataset/Dockerfile new file mode 100644 index 0000000000000..2db9d28b2d1e9 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.7-slim + +# Bash is installed for more convenient debugging. +RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* + +WORKDIR /airbyte/integration_code +COPY source_apify_dataset ./source_apify_dataset +COPY main.py ./ +COPY setup.py ./ +RUN pip install . + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-apify-dataset diff --git a/airbyte-integrations/connectors/source-apify-dataset/README.md b/airbyte-integrations/connectors/source-apify-dataset/README.md new file mode 100644 index 0000000000000..5c83f60071e61 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/README.md @@ -0,0 +1,131 @@ +# Apify Dataset Source + +This is the repository for the Apify Dataset source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/apify-dataset). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-apify-dataset:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/apify-dataset) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_apify_dataset/spec.json` file. +Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +You can get your Apify credentials from Settings > Integration [section](https://my.apify.com/account#/integrations) of the Apify app + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source apify-dataset test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-apify-dataset:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-apify-dataset:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-apify-dataset:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-apify-dataset:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-apify-dataset:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-apify-dataset:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing + Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](source-acceptance-tests.md) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-apify-dataset:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-apify-dataset:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-apify-dataset/acceptance-test-config.yml b/airbyte-integrations/connectors/source-apify-dataset/acceptance-test-config.yml new file mode 100644 index 0000000000000..2085418f7487c --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/acceptance-test-config.yml @@ -0,0 +1,19 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/contributing-to-airbyte/building-new-connector/source-acceptance-tests.md) +# for more information about how to configure these tests +connector_image: airbyte/source-apify-dataset:dev +tests: + spec: + - spec_path: "source_apify_dataset/spec.json" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-apify-dataset/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-apify-dataset/acceptance-test-docker.sh new file mode 100755 index 0000000000000..e4d8b1cef8961 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-apify-dataset/build.gradle b/airbyte-integrations/connectors/source-apify-dataset/build.gradle new file mode 100644 index 0000000000000..804520b6049d9 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/build.gradle @@ -0,0 +1,14 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_apify_dataset_singer' +} + +dependencies { + implementation files(project(':airbyte-integrations:bases:source-acceptance-test').airbyteDocker.outputs) + implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs) +} diff --git a/airbyte-integrations/connectors/source-apify-dataset/integration_tests/__init__.py b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/airbyte-integrations/connectors/source-apify-dataset/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/acceptance.py new file mode 100644 index 0000000000000..d6cbdc97c495c --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/acceptance.py @@ -0,0 +1,34 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """ This fixture is a placeholder for external resources that acceptance test might require.""" + yield diff --git a/airbyte-integrations/connectors/source-apify-dataset/integration_tests/catalog.json b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/catalog.json new file mode 100644 index 0000000000000..cdb25a546d965 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/catalog.json @@ -0,0 +1,13 @@ +{ + "streams": [ + { + "name": "DatasetItems", + "supported_sync_modes": ["full_refresh"], + "destination_sync_mode": "overwrite", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object" + } + } + ] + } diff --git a/airbyte-integrations/connectors/source-apify-dataset/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/configured_catalog.json new file mode 100644 index 0000000000000..6f193ef8c6322 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/configured_catalog.json @@ -0,0 +1,16 @@ +{ + "streams": [{ + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + "stream": { + "name": "DatasetItems", + "supported_sync_modes": ["full_refresh"], + "destination_sync_mode": "overwrite", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object" + } + } + }] + } + \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-apify-dataset/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/invalid_config.json new file mode 100644 index 0000000000000..f5cc5b4ea5e40 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/invalid_config.json @@ -0,0 +1,6 @@ +{ + "userId": "non_existent_user_id", + "apifyToken": "non_existent_token", + "datasetId": "non_existent_dataset_it", + "clean": false +} diff --git a/airbyte-integrations/connectors/source-apify-dataset/main.py b/airbyte-integrations/connectors/source-apify-dataset/main.py new file mode 100644 index 0000000000000..1adfc25ba0b06 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/main.py @@ -0,0 +1,33 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_apify_dataset import SourceApifyDataset + +if __name__ == "__main__": + source = SourceApifyDataset() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-apify-dataset/requirements.txt b/airbyte-integrations/connectors/source-apify-dataset/requirements.txt new file mode 100644 index 0000000000000..7be17a56d745d --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/requirements.txt @@ -0,0 +1,3 @@ +# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies. +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-apify-dataset/setup.py b/airbyte-integrations/connectors/source-apify-dataset/setup.py new file mode 100644 index 0000000000000..a90c2c67e1c99 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/setup.py @@ -0,0 +1,46 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = ["airbyte-cdk", "apify-client"] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "source-acceptance-test", +] + +setup( + name="source_apify_dataset", + description="Source implementation for Apify Dataset.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/__init__.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/__init__.py new file mode 100644 index 0000000000000..478378a7bb48c --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/__init__.py @@ -0,0 +1,26 @@ +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +from .source import SourceApifyDataset + +__all__ = ["SourceApifyDataset"] diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py new file mode 100644 index 0000000000000..8431973ea3fb2 --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py @@ -0,0 +1,151 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import concurrent.futures +import json +from datetime import datetime +from typing import Dict, Generator + +from airbyte_cdk.logger import AirbyteLogger +from airbyte_cdk.models import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + Status, + Type, +) +from airbyte_cdk.models.airbyte_protocol import SyncMode +from airbyte_cdk.sources import Source +from apify_client import ApifyClient + +DATASET_ITEMS_STREAM_NAME = "DatasetItems" + +BATCH_SIZE = 50000 + + +class SourceApifyDataset(Source): + def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: + """ + Tests if the input configuration can be used to successfully connect to the Apify integration. + This is tested by trying to access the Apify user object with the provided userId and Apify token. + + :param logger: Logging object to display debug/info/error to the logs + (logs will not be accessible via airbyte UI if they are not passed to this logger) + :param config: Json object containing the configuration of this source, content of this json is as specified in + the properties of the spec.json file + + :return: AirbyteConnectionStatus indicating a Success or Failure + """ + + try: + # Try to get user info using the provided userId and token. If either of them is invalid, ApifyClient throws an exception + user_id = config["userId"] + apify_token = config["apifyToken"] + + client = ApifyClient(apify_token) + client.user(user_id).get() + + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + except Exception as e: + return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}") + + def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: + """ + Returns an AirbyteCatalog representing the available streams and fields in this integration. + For example, given valid credentials to a Postgres database, + returns an Airbyte catalog where each postgres table is a stream, and each table column is a field. + + :param logger: Logging object to display debug/info/error to the logs + (logs will not be accessible via airbyte UI if they are not passed to this logger) + :param config: Json object containing the configuration of this source, content of this json is as specified in + the properties of the spec.json file + + :return: AirbyteCatalog is an object describing a list of all available streams in this source. + A stream is an AirbyteStream object that includes: + - its stream name (or table name in the case of Postgres) + - json_schema providing the specifications of expected schema for this stream (a list of columns described + by their names and types) + """ + stream_name = DATASET_ITEMS_STREAM_NAME + json_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + } + + return AirbyteCatalog( + streams=[AirbyteStream(name=stream_name, supported_sync_modes=[SyncMode.full_refresh], json_schema=json_schema)] + ) + + def read( + self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any] + ) -> Generator[AirbyteMessage, None, None]: + """ + Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, + catalog, and state. + + :param logger: Logging object to display debug/info/error to the logs + (logs will not be accessible via airbyte UI if they are not passed to this logger) + :param config: Json object containing the configuration of this source, content of this json is as specified in + the properties of the spec.json file + :param catalog: The input catalog is a ConfiguredAirbyteCatalog which is almost the same as AirbyteCatalog + returned by discover(), but + in addition, it's been configured in the UI! For each particular stream and field, there may have been provided + with extra modifications such as: filtering streams and/or columns out, renaming some entities, etc + :param state: When a Airbyte reads data from a source, it might need to keep a checkpoint cursor to resume + replication in the future from that saved checkpoint. + This is the object that is provided with state from previous runs and avoid replicating the entire set of + data everytime. + + :return: A generator that produces a stream of AirbyteRecordMessage contained in AirbyteMessage object. + """ + logger.info("Reading data from Apify dataset") + + apify_token = config["apifyToken"] + dataset_id = config["datasetId"] + clean = config["clean"] + + client = ApifyClient(apify_token) + dataset_client = client.dataset(dataset_id) + + # Get total number of items in dataset. This will be used in pagination + dataset = dataset_client.get() + num_items = dataset["itemCount"] + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [] + for offset in range(0, num_items, BATCH_SIZE): + limit = BATCH_SIZE + futures.append(executor.submit(dataset_client.list_items, offset=offset, limit=limit, clean=clean)) + for future in concurrent.futures.as_completed(futures): + for data in future.result().items: + yield AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream=DATASET_ITEMS_STREAM_NAME, data=data, emitted_at=int(datetime.now().timestamp()) * 1000 + ), + ) diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json new file mode 100644 index 0000000000000..a394f40490a9c --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json @@ -0,0 +1,28 @@ +{ + "documentationUrl": "https://docsurl.com", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Apify Dataset Spec", + "type": "object", + "required": ["apifyToken", "datasetId"], + "additionalProperties": false, + "properties": { + "userId": { + "type": "string", + "description": "Your Apify user ID. You can find it here" + }, + "apifyToken": { + "type": "string", + "description": "Your Apify account API token. You can find it here" + }, + "datasetId": { + "type": "string", + "description": "ID of the dataset you would like to load to Airbyte." + }, + "clean": { + "type": "boolean", + "description": "If set to true, only clean items will be downloaded from the dataset. See description of what clean means in Apify API docs. If not sure, set clean to false." + } + } + } +} diff --git a/airbyte-integrations/connectors/source-apify-dataset/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-apify-dataset/unit_tests/unit_test.py new file mode 100644 index 0000000000000..b8a8150b507fd --- /dev/null +++ b/airbyte-integrations/connectors/source-apify-dataset/unit_tests/unit_test.py @@ -0,0 +1,27 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +def test_example_method(): + assert True From 21acfdc9b10253ad201dc5065af91a9635e51549 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Wed, 28 Jul 2021 16:57:08 +0200 Subject: [PATCH 02/18] Add apify dataset to source definition --- .../init/src/main/resources/seed/source_definitions.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f8fbacf102380..142987924796b 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -414,3 +414,8 @@ dockerRepository: airbyte/source-snapchat-marketing dockerImageTag: 0.1.1 documentationUrl: https://docs.airbyte.io/integrations/sources/snapchat-marketing +- sourceDefinitionId: 47f17145-fe20-4ef5-a548-e29b048adf84 + name: Apify Dataset + dockerRepository: airbyte/source-apify-dataset + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/sources/apify-dataset From e4f962f5c98bdcd9d528a84560c52416f1349ef2 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Wed, 28 Jul 2021 17:29:16 +0200 Subject: [PATCH 03/18] Make sure clean is False by default --- .../source-apify-dataset/source_apify_dataset/source.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py index 8431973ea3fb2..2b68945929682 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py @@ -127,7 +127,7 @@ def read( apify_token = config["apifyToken"] dataset_id = config["datasetId"] - clean = config["clean"] + clean = config.get("clean", False) client = ApifyClient(apify_token) dataset_client = client.dataset(dataset_id) @@ -135,7 +135,7 @@ def read( # Get total number of items in dataset. This will be used in pagination dataset = dataset_client.get() num_items = dataset["itemCount"] - + with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for offset in range(0, num_items, BATCH_SIZE): From 01e3a9fc5cd3974cade283ec8a831d39ff62d0b3 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Thu, 29 Jul 2021 10:29:23 +0200 Subject: [PATCH 04/18] Remove need for user id and token since it is not needed for reading dataset --- .../integration_tests/invalid_config.json | 4 +--- .../source_apify_dataset/source.py | 14 +++++--------- .../source_apify_dataset/spec.json | 10 +--------- 3 files changed, 7 insertions(+), 21 deletions(-) diff --git a/airbyte-integrations/connectors/source-apify-dataset/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/invalid_config.json index f5cc5b4ea5e40..f84d5c7a63330 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/invalid_config.json @@ -1,6 +1,4 @@ { - "userId": "non_existent_user_id", - "apifyToken": "non_existent_token", - "datasetId": "non_existent_dataset_it", + "datasetId": "non_existent_dataset_id", "clean": false } diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py index 2b68945929682..4db5bc49b677d 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py @@ -63,13 +63,10 @@ def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: """ try: - # Try to get user info using the provided userId and token. If either of them is invalid, ApifyClient throws an exception - user_id = config["userId"] - apify_token = config["apifyToken"] - - client = ApifyClient(apify_token) - client.user(user_id).get() - + dataset_id = config["datasetId"] + dataset = ApifyClient().dataset(dataset_id).get() + if dataset is None: + raise ValueError(f"Dataset {dataset_id} does not exist") return AirbyteConnectionStatus(status=Status.SUCCEEDED) except Exception as e: return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}") @@ -125,11 +122,10 @@ def read( """ logger.info("Reading data from Apify dataset") - apify_token = config["apifyToken"] dataset_id = config["datasetId"] clean = config.get("clean", False) - client = ApifyClient(apify_token) + client = ApifyClient() dataset_client = client.dataset(dataset_id) # Get total number of items in dataset. This will be used in pagination diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json index a394f40490a9c..294e22aac155a 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json @@ -4,17 +4,9 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Apify Dataset Spec", "type": "object", - "required": ["apifyToken", "datasetId"], + "required": ["datasetId"], "additionalProperties": false, "properties": { - "userId": { - "type": "string", - "description": "Your Apify user ID. You can find it here" - }, - "apifyToken": { - "type": "string", - "description": "Your Apify account API token. You can find it here" - }, "datasetId": { "type": "string", "description": "ID of the dataset you would like to load to Airbyte." From 69d1227034fda1fdcfaf7d0fdde931fd03ed447e Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Thu, 29 Jul 2021 10:31:45 +0200 Subject: [PATCH 05/18] Add comment --- .../source-apify-dataset/source_apify_dataset/source.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py index 4db5bc49b677d..5c49dc46f1f11 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py @@ -45,6 +45,7 @@ DATASET_ITEMS_STREAM_NAME = "DatasetItems" +# Batch size for downloading dataset items from Apify dataset BATCH_SIZE = 50000 From 708e1f1610b8aeb75f725fa23095ccd09e310d69 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Thu, 29 Jul 2021 10:45:48 +0200 Subject: [PATCH 06/18] Update README --- .../connectors/source-apify-dataset/README.md | 4 +++ docs/integrations/sources/apify-dataset.md | 35 +++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 docs/integrations/sources/apify-dataset.md diff --git a/airbyte-integrations/connectors/source-apify-dataset/README.md b/airbyte-integrations/connectors/source-apify-dataset/README.md index 5c83f60071e61..213935cecb904 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/README.md +++ b/airbyte-integrations/connectors/source-apify-dataset/README.md @@ -3,6 +3,10 @@ This is the repository for the Apify Dataset source connector, written in Python. For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/apify-dataset). +# About connector +This connector allows you to download data from Apify [dataset](https://docs.apify.com/storage/dataset) to Airbyte. All you need +is Apify dataset ID. + ## Local development ### Prerequisites diff --git a/docs/integrations/sources/apify-dataset.md b/docs/integrations/sources/apify-dataset.md new file mode 100644 index 0000000000000..a280c2ada97ef --- /dev/null +++ b/docs/integrations/sources/apify-dataset.md @@ -0,0 +1,35 @@ +# Apify dataset + +## Overview + + +The Apify dataset connect supports full refresh sync only. + +It can sync data from a single [Apify Dataset](https://docs.apify.com/storage/dataset) by its ID. + +### Output schema + +Since the dataset items do not have strongly typed schema, they are synced as objects, without any assumption on their content. + +### Features + +| Feature | Supported? | +| :--- | :--- | +| Full Refresh Sync | Yes | +| Incremental Sync | No | + +### Performance considerations + +The Apify dataset connector uses [Apify Python Client](https://docs.apify.com/apify-client-python) under the hood and should handle any API limitations under normal usage. + +## Getting started + +### Requirements + +* Dataset Id +* Amplitude Secret Key + +### Setup guide + +Please read [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information). + From 8d998b29f038a3a24029d832adc930dcb81fb5e8 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Thu, 29 Jul 2021 10:47:51 +0200 Subject: [PATCH 07/18] Add docs to summary --- docs/SUMMARY.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 083b54386edeb..100b5463d6cf5 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -29,6 +29,7 @@ * [Sources](integrations/sources/README.md) * [Amazon Seller Partner](integrations/sources/amazon-seller-partner.md) * [Amplitude](integrations/sources/amplitude.md) + * [Apify Dataset](integrations/sources/apify-dataset.md) * [Appstore](integrations/sources/appstore.md) * [Asana](integrations/sources/asana.md) * [AWS CloudTrail](integrations/sources/aws-cloudtrail.md) From 946cc2bcab098321ff8f75e9a300b4fede5d4db5 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Thu, 29 Jul 2021 10:49:33 +0200 Subject: [PATCH 08/18] Add changelog to readme --- docs/integrations/sources/apify-dataset.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/integrations/sources/apify-dataset.md b/docs/integrations/sources/apify-dataset.md index a280c2ada97ef..f123736572044 100644 --- a/docs/integrations/sources/apify-dataset.md +++ b/docs/integrations/sources/apify-dataset.md @@ -33,3 +33,7 @@ The Apify dataset connector uses [Apify Python Client](https://docs.apify.com/ap Please read [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information). +### Changelog +| Version | Date | Pull Request | Subject | +| :------ | :-------- | :----- | :------ | +| 0.1.0 | 2021-07-29 | [PR#](https://github.com/airbytehq/airbyte/pull/PR#) | Initial version of the connector | \ No newline at end of file From 3ad2d8750aeef20507263dcfd1d876076a5bc2d7 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Thu, 29 Jul 2021 11:06:42 +0200 Subject: [PATCH 09/18] Add link to README --- docs/integrations/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/README.md b/docs/integrations/README.md index 0ebbfb5b2e027..603b48ed42450 100644 --- a/docs/integrations/README.md +++ b/docs/integrations/README.md @@ -14,6 +14,7 @@ Airbyte uses a grading system for connectors to help users understand what to ex |----|----| |[Amazon Seller Partner](./sources/amazon-seller-partner.md)| Alpha | |[Amplitude](./sources/amplitude.md)| Beta | +|[Apify Dataset](./sources/apify-dataset.md)| Alpha | |[Appstore](./sources/appstore.md)| Alpha | |[Asana](./sources/asana.md) | Beta | |[AWS CloudTrail](./sources/aws-cloudtrail.md)| Beta | From 7d0b25b1c21a9b785b2d90622973dabee22a9af2 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Thu, 29 Jul 2021 11:08:17 +0200 Subject: [PATCH 10/18] Add PR link --- docs/integrations/sources/apify-dataset.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/apify-dataset.md b/docs/integrations/sources/apify-dataset.md index f123736572044..0ec91801d00d8 100644 --- a/docs/integrations/sources/apify-dataset.md +++ b/docs/integrations/sources/apify-dataset.md @@ -36,4 +36,4 @@ Please read [How to get your API key and Secret key](https://help.amplitude.com/ ### Changelog | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | -| 0.1.0 | 2021-07-29 | [PR#](https://github.com/airbytehq/airbyte/pull/PR#) | Initial version of the connector | \ No newline at end of file +| 0.1.0 | 2021-07-29 | [PR#5069](https://github.com/airbytehq/airbyte/pull/5069) | Initial version of the connector | \ No newline at end of file From 445b9cbb1d65b410c279c849aa8202ea8af0630b Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Thu, 29 Jul 2021 15:39:03 +0200 Subject: [PATCH 11/18] Address comments --- .../47f17145-fe20-4ef5-a548-e29b048adf84.json | 7 +++++++ airbyte-integrations/builds.md | 1 + .../connectors/source-apify-dataset/build.gradle | 2 +- .../integration_tests/configured_catalog.json | 1 - .../connectors/source-apify-dataset/setup.py | 2 +- .../source-apify-dataset/source_apify_dataset/source.py | 3 ++- .../source-apify-dataset/source_apify_dataset/spec.json | 2 +- docs/integrations/sources/apify-dataset.md | 1 - 8 files changed, 13 insertions(+), 6 deletions(-) create mode 100644 airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json new file mode 100644 index 0000000000000..fdee14b0ee83b --- /dev/null +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json @@ -0,0 +1,7 @@ +{ + "sourceDefinitionId": "47f17145-fe20-4ef5-a548-e29b048adf84", + "name": "Apify Dataset", + "dockerRepository": "airbyte/source-apify-dataset", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://docs.airbyte.io/integrations/sources/apify-dataset" + } \ No newline at end of file diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index 720e764ad5d31..3334bb38916a8 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -6,6 +6,7 @@ | :--- | :--- | | Amazon Seller Partner | [![source-amazon-seller-partner](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-amazon-seller-partner%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-amazon-seller-partner) | | Amplitude | [![source-amplitude](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-amplitude%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-amplitude) | +| Apify Dataset | [![source-amplitude](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-apify-dataset%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-apify-dataset) | | AppsFlyer | [![source-braintree-singer](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-appsflyer-singer%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-appsflyer-singer) | | App Store | [![source-appstore-singer](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-appstore-singer%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-appstore-singer) | | Asana | [![source-asana](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-asana%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-asana) | diff --git a/airbyte-integrations/connectors/source-apify-dataset/build.gradle b/airbyte-integrations/connectors/source-apify-dataset/build.gradle index 804520b6049d9..c33a7a8f87db0 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/build.gradle +++ b/airbyte-integrations/connectors/source-apify-dataset/build.gradle @@ -5,7 +5,7 @@ plugins { } airbytePython { - moduleDirectory 'source_apify_dataset_singer' + moduleDirectory 'source_apify_dataset' } dependencies { diff --git a/airbyte-integrations/connectors/source-apify-dataset/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/configured_catalog.json index 6f193ef8c6322..098c99566dfb7 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-apify-dataset/integration_tests/configured_catalog.json @@ -13,4 +13,3 @@ } }] } - \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-apify-dataset/setup.py b/airbyte-integrations/connectors/source-apify-dataset/setup.py index a90c2c67e1c99..b4732da974e69 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/setup.py +++ b/airbyte-integrations/connectors/source-apify-dataset/setup.py @@ -25,7 +25,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk", "apify-client"] +MAIN_REQUIREMENTS = ["airbyte-cdk", "apify-client~=0.0.1"] TEST_REQUIREMENTS = [ "pytest~=6.1", diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py index 5c49dc46f1f11..ce79a8c893b3a 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py @@ -68,9 +68,10 @@ def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: dataset = ApifyClient().dataset(dataset_id).get() if dataset is None: raise ValueError(f"Dataset {dataset_id} does not exist") - return AirbyteConnectionStatus(status=Status.SUCCEEDED) except Exception as e: return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}") + else: + return AirbyteConnectionStatus(status=Status.SUCCEEDED) def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: """ diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json index 294e22aac155a..0efa316061d2f 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/spec.json @@ -1,5 +1,5 @@ { - "documentationUrl": "https://docsurl.com", + "documentationUrl": "https://docs.airbyte.io/integrations/sources/apify-dataset", "connectionSpecification": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "Apify Dataset Spec", diff --git a/docs/integrations/sources/apify-dataset.md b/docs/integrations/sources/apify-dataset.md index 0ec91801d00d8..1320123c2389e 100644 --- a/docs/integrations/sources/apify-dataset.md +++ b/docs/integrations/sources/apify-dataset.md @@ -27,7 +27,6 @@ The Apify dataset connector uses [Apify Python Client](https://docs.apify.com/ap ### Requirements * Dataset Id -* Amplitude Secret Key ### Setup guide From 6187b560d275ba0b6a8fcf740370042f91e26a80 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Thu, 29 Jul 2021 17:01:50 +0200 Subject: [PATCH 12/18] Add newline --- .../47f17145-fe20-4ef5-a548-e29b048adf84.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json index fdee14b0ee83b..f1a7e717d6e22 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json @@ -4,4 +4,5 @@ "dockerRepository": "airbyte/source-apify-dataset", "dockerImageTag": "0.1.0", "documentationUrl": "https://docs.airbyte.io/integrations/sources/apify-dataset" - } \ No newline at end of file + } + \ No newline at end of file From dacf97866c166ec090e33bb2cfebfd6fb8124c09 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Mon, 2 Aug 2021 10:58:24 +0200 Subject: [PATCH 13/18] Docs nits --- .../source-apify-dataset/source_apify_dataset/source.py | 4 ++-- docs/integrations/sources/apify-dataset.md | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py index ce79a8c893b3a..0c7dd0bbdc7b4 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py @@ -113,8 +113,8 @@ def read( the properties of the spec.json file :param catalog: The input catalog is a ConfiguredAirbyteCatalog which is almost the same as AirbyteCatalog returned by discover(), but - in addition, it's been configured in the UI! For each particular stream and field, there may have been provided - with extra modifications such as: filtering streams and/or columns out, renaming some entities, etc + in addition, it's been configured in the UI! For each particular stream and field, there may have been provided + with extra modifications such as: filtering streams and/or columns out, renaming some entities, etc :param state: When a Airbyte reads data from a source, it might need to keep a checkpoint cursor to resume replication in the future from that saved checkpoint. This is the object that is provided with state from previous runs and avoid replicating the entire set of diff --git a/docs/integrations/sources/apify-dataset.md b/docs/integrations/sources/apify-dataset.md index 1320123c2389e..17ce004ef7df9 100644 --- a/docs/integrations/sources/apify-dataset.md +++ b/docs/integrations/sources/apify-dataset.md @@ -28,10 +28,6 @@ The Apify dataset connector uses [Apify Python Client](https://docs.apify.com/ap * Dataset Id -### Setup guide - -Please read [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information). - ### Changelog | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | From b7db294bc44a50e6354a2557f1576916c948d188 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Mon, 2 Aug 2021 11:09:24 +0200 Subject: [PATCH 14/18] Make sure that dataset items come in the correct order --- .../source_apify_dataset/source.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py index 0c7dd0bbdc7b4..8b23fe28284f6 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py @@ -134,13 +134,15 @@ def read( dataset = dataset_client.get() num_items = dataset["itemCount"] - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [] - for offset in range(0, num_items, BATCH_SIZE): - limit = BATCH_SIZE - futures.append(executor.submit(dataset_client.list_items, offset=offset, limit=limit, clean=clean)) - for future in concurrent.futures.as_completed(futures): - for data in future.result().items: + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + apify_get_items_request = lambda offset: dataset_client.list_items( + offset=offset, + limit=BATCH_SIZE, + clean=clean + ) + + for result in executor.map(apify_get_items_request, range(0, num_items, BATCH_SIZE)): + for data in result.items: yield AirbyteMessage( type=Type.RECORD, record=AirbyteRecordMessage( From 928ad9b5ac9be6a31ce76adc8075da22b938df7f Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Mon, 2 Aug 2021 11:25:43 +0200 Subject: [PATCH 15/18] lint --- .../source-apify-dataset/source_apify_dataset/source.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py index 8b23fe28284f6..bcea73bb2eb68 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py @@ -135,11 +135,7 @@ def read( num_items = dataset["itemCount"] with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: - apify_get_items_request = lambda offset: dataset_client.list_items( - offset=offset, - limit=BATCH_SIZE, - clean=clean - ) + apify_get_items_request = lambda offset: dataset_client.list_items(offset=offset, limit=BATCH_SIZE, clean=clean) for result in executor.map(apify_get_items_request, range(0, num_items, BATCH_SIZE)): for data in result.items: From 766d59ec0d56aa8c523440856dff11e95408b4f0 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Mon, 2 Aug 2021 12:03:05 +0200 Subject: [PATCH 16/18] User partial function --- .../source_apify_dataset/source.py | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py index bcea73bb2eb68..d028d9daf6c8a 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py @@ -22,8 +22,8 @@ # SOFTWARE. # - import concurrent.futures +from functools import partial import json from datetime import datetime from typing import Dict, Generator @@ -48,6 +48,19 @@ # Batch size for downloading dataset items from Apify dataset BATCH_SIZE = 50000 +def apify_get_dataset_items(dataset_client, clean, offset): + """ + Wrapper around Apify dataset client that returns a single page with dataset items. + This function needs to be defined explicitly so it can be called in parallel in the main read function. + + :param dataset_client: Apify dataset client + :param clean: whether to fetch only clean items (clean are non-empty ones excluding hidden columns) + :param offset: page offset + + :return: dictionary where .items field contains the fetched dataset items + """ + return dataset_client.list_items(offset=offset, limit=BATCH_SIZE, clean=clean) + class SourceApifyDataset(Source): def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: @@ -134,10 +147,8 @@ def read( dataset = dataset_client.get() num_items = dataset["itemCount"] - with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: - apify_get_items_request = lambda offset: dataset_client.list_items(offset=offset, limit=BATCH_SIZE, clean=clean) - - for result in executor.map(apify_get_items_request, range(0, num_items, BATCH_SIZE)): + with concurrent.futures.ThreadPoolExecutor() as executor: + for result in executor.map(partial(apify_get_dataset_items, dataset_client, clean), range(0, num_items, BATCH_SIZE)): for data in result.items: yield AirbyteMessage( type=Type.RECORD, From 7a642a4c919ae5c81f12733f2190220f284a1b85 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Mon, 2 Aug 2021 12:04:21 +0200 Subject: [PATCH 17/18] lint --- .../source-apify-dataset/source_apify_dataset/source.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py index d028d9daf6c8a..9be143264302c 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py @@ -23,9 +23,9 @@ # import concurrent.futures -from functools import partial import json from datetime import datetime +from functools import partial from typing import Dict, Generator from airbyte_cdk.logger import AirbyteLogger @@ -48,6 +48,7 @@ # Batch size for downloading dataset items from Apify dataset BATCH_SIZE = 50000 + def apify_get_dataset_items(dataset_client, clean, offset): """ Wrapper around Apify dataset client that returns a single page with dataset items. From a1b153d2990cf4ab054531cca0cd2645d57bbb19 Mon Sep 17 00:00:00 2001 From: Matej Hamas Date: Mon, 2 Aug 2021 17:47:01 +0200 Subject: [PATCH 18/18] Address comments: --- .../47f17145-fe20-4ef5-a548-e29b048adf84.json | 3 +-- .../source_apify_dataset/source.py | 25 +++++++++---------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json index f1a7e717d6e22..fdee14b0ee83b 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/47f17145-fe20-4ef5-a548-e29b048adf84.json @@ -4,5 +4,4 @@ "dockerRepository": "airbyte/source-apify-dataset", "dockerImageTag": "0.1.0", "documentationUrl": "https://docs.airbyte.io/integrations/sources/apify-dataset" - } - \ No newline at end of file + } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py index 9be143264302c..2fcc86e2caeaf 100644 --- a/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py +++ b/airbyte-integrations/connectors/source-apify-dataset/source_apify_dataset/source.py @@ -49,21 +49,20 @@ BATCH_SIZE = 50000 -def apify_get_dataset_items(dataset_client, clean, offset): - """ - Wrapper around Apify dataset client that returns a single page with dataset items. - This function needs to be defined explicitly so it can be called in parallel in the main read function. - - :param dataset_client: Apify dataset client - :param clean: whether to fetch only clean items (clean are non-empty ones excluding hidden columns) - :param offset: page offset +class SourceApifyDataset(Source): + def _apify_get_dataset_items(self, dataset_client, clean, offset): + """ + Wrapper around Apify dataset client that returns a single page with dataset items. + This function needs to be defined explicitly so it can be called in parallel in the main read function. - :return: dictionary where .items field contains the fetched dataset items - """ - return dataset_client.list_items(offset=offset, limit=BATCH_SIZE, clean=clean) + :param dataset_client: Apify dataset client + :param clean: whether to fetch only clean items (clean are non-empty ones excluding hidden columns) + :param offset: page offset + :return: dictionary where .items field contains the fetched dataset items + """ + return dataset_client.list_items(offset=offset, limit=BATCH_SIZE, clean=clean) -class SourceApifyDataset(Source): def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: """ Tests if the input configuration can be used to successfully connect to the Apify integration. @@ -149,7 +148,7 @@ def read( num_items = dataset["itemCount"] with concurrent.futures.ThreadPoolExecutor() as executor: - for result in executor.map(partial(apify_get_dataset_items, dataset_client, clean), range(0, num_items, BATCH_SIZE)): + for result in executor.map(partial(self._apify_get_dataset_items, dataset_client, clean), range(0, num_items, BATCH_SIZE)): for data in result.items: yield AirbyteMessage( type=Type.RECORD,