diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json index cd73c479261a5..addc1ddf76170 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "12928b32-bf0a-4f1e-964f-07e12e37153a", "name": "Mixpanel", "dockerRepository": "airbyte/source-mixpanel", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.2", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mixpanel", "icon": "mixpanel.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 53d51637789f9..11082ad6991b3 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -328,7 +328,7 @@ - name: Mixpanel sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a dockerRepository: airbyte/source-mixpanel - dockerImageTag: 0.1.1 + dockerImageTag: 0.1.2 documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel icon: mixpanel.svg sourceType: api diff --git a/airbyte-integrations/connectors/source-mixpanel/Dockerfile b/airbyte-integrations/connectors/source-mixpanel/Dockerfile index d77882fdd02dd..6985afaf20690 100644 --- a/airbyte-integrations/connectors/source-mixpanel/Dockerfile +++ b/airbyte-integrations/connectors/source-mixpanel/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-mixpanel diff --git a/airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml b/airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml index 974e60c5e2591..a198296243479 100644 --- a/airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml @@ -14,9 +14,11 @@ tests: basic_read: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" + timeout_seconds: 3600 full_refresh: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" + timeout_seconds: 3600 incremental: # incremental streams Funnels, Revenue, Export # Funnels - fails because it has complex state, like {'funnel_idX': {'date': 'dateX'}} @@ -29,4 +31,5 @@ tests: cursor_paths: revenue: ["date"] export: ["date"] + timeout_seconds: 3600 diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index fc1ce3a78fa58..894301292adc7 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -27,13 +27,7 @@ class MixpanelStream(HttpStream, ABC): A maximum of 5 concurrent queries 400 queries per hour. - API Rate Limit Handler: - If total number of planned requests is lower than it is allowed per hour - then - reset reqs_per_hour_limit and send requests with small delay (1 reqs/sec) - because API endpoint accept requests bursts up to 3 reqs/sec - else - send requests with planned delay: 3600/reqs_per_hour_limit seconds + API Rate Limit Handler: after each request freeze for the time period: 3600/reqs_per_hour_limit seconds """ @property @@ -82,7 +76,7 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi self.logger.error(f"Stream {self.name}: {e.response.status_code} {e.response.reason} - {error_message}") raise e - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: json_response = response.json() if self.data_field is not None: data = json_response.get(self.data_field, []) @@ -94,6 +88,11 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp for record in data: yield record + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + + # parse the whole response + yield from self.process_response(response, **kwargs) + # wait for X seconds to match API limitations time.sleep(3600 / self.reqs_per_hour_limit) @@ -190,10 +189,6 @@ def stream_slices( # add 1 additional day because date range is inclusive start_date = end_date + timedelta(days=1) - # reset reqs_per_hour_limit if we expect less requests (1 req per stream) than it is allowed by API reqs_per_hour_limit - if len(date_slices) < self.reqs_per_hour_limit: - self.reqs_per_hour_limit = 3600 # 1 query per sec - return date_slices def request_params( @@ -269,9 +264,6 @@ def stream_slices( for date_slice in date_slices: stream_slices.append({**funnel_slice, **date_slice}) - # reset reqs_per_hour_limit if we expect less requests (1 req per stream) than it is allowed by API reqs_per_hour_limit - if len(stream_slices) < self.reqs_per_hour_limit: - self.reqs_per_hour_limit = 3600 # queries per hour (1 query per sec) return stream_slices def request_params( @@ -288,7 +280,7 @@ def request_params( params["unit"] = "day" return params - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """ response.json() example: { @@ -368,7 +360,7 @@ class EngageSchema(MixpanelStream): def path(self, **kwargs) -> str: return "engage/properties" - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """ response.json() example: { @@ -444,7 +436,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, self._total = None return None - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """ { "page": 0 @@ -591,7 +583,7 @@ class Revenue(DateSlicesMixin, IncrementalMixpanelStream): def path(self, **kwargs) -> str: return "engage/revenue" - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """ response.json() example: { @@ -634,7 +626,7 @@ class ExportSchema(MixpanelStream): def path(self, **kwargs) -> str: return "events/properties/top" - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[str]: + def process_response(self, response: requests.Response, **kwargs) -> Iterable[str]: """ response.json() example: { @@ -691,7 +683,7 @@ def url_base(self): def path(self, **kwargs) -> str: return "export" - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """Export API return response.text in JSONL format but each line is a valid JSON object Raw item example: { @@ -737,8 +729,6 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield item - # wait for X seconds to meet API limitation - time.sleep(3600 / self.reqs_per_hour_limit) def get_json_schema(self) -> Mapping[str, Any]: """ diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index 4c3e3dc1ce984..50713de707e0c 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -56,6 +56,7 @@ Select the correct region \(EU or US\) for your Mixpanel project. See detail [he | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| `0.1.2` | 2021-11-02 | [7439](https://github.com/airbytehq/airbyte/issues/7439) | Added delay for all streams to match API limitation of requests rate | | `0.1.1` | 2021-09-16 | [6075](https://github.com/airbytehq/airbyte/issues/6075) | Added option to select project region | | `0.1.0` | 2021-07-06 | [3698](https://github.com/airbytehq/airbyte/issues/3698) | created CDK native mixpanel connector |