Skip to content

Commit

Permalink
🐛 source mixpanel: match API limitation of requests rate (#7439)
Browse files Browse the repository at this point in the history
* Added delay to for all streams, removed logic which increase reqs rate because it does not take into consideration actual number of requests made in previous and next streams.

* Fixed argmument passing

* Increased timeout for SAT

* Increased timeout for SAT

* bump version

* bumped connector version, updated change log

Co-authored-by: Marcos Marx <[email protected]>
  • Loading branch information
midavadim and marcosmarxm authored Nov 2, 2021
1 parent 5011226 commit 6285d31
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "12928b32-bf0a-4f1e-964f-07e12e37153a",
"name": "Mixpanel",
"dockerRepository": "airbyte/source-mixpanel",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mixpanel",
"icon": "mixpanel.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@
- name: Mixpanel
sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerRepository: airbyte/source-mixpanel
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel
icon: mixpanel.svg
sourceType: api
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 3600
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 3600
incremental:
# incremental streams Funnels, Revenue, Export
# Funnels - fails because it has complex state, like {'funnel_idX': {'date': 'dateX'}}
Expand All @@ -29,4 +31,5 @@ tests:
cursor_paths:
revenue: ["date"]
export: ["date"]
timeout_seconds: 3600

Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ class MixpanelStream(HttpStream, ABC):
A maximum of 5 concurrent queries
400 queries per hour.
API Rate Limit Handler:
If total number of planned requests is lower than it is allowed per hour
then
reset reqs_per_hour_limit and send requests with small delay (1 reqs/sec)
because API endpoint accept requests bursts up to 3 reqs/sec
else
send requests with planned delay: 3600/reqs_per_hour_limit seconds
API Rate Limit Handler: after each request freeze for the time period: 3600/reqs_per_hour_limit seconds
"""

@property
Expand Down Expand Up @@ -82,7 +76,7 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi
self.logger.error(f"Stream {self.name}: {e.response.status_code} {e.response.reason} - {error_message}")
raise e

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
json_response = response.json()
if self.data_field is not None:
data = json_response.get(self.data_field, [])
Expand All @@ -94,6 +88,11 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
for record in data:
yield record

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:

# parse the whole response
yield from self.process_response(response, **kwargs)

# wait for X seconds to match API limitations
time.sleep(3600 / self.reqs_per_hour_limit)

Expand Down Expand Up @@ -190,10 +189,6 @@ def stream_slices(
# add 1 additional day because date range is inclusive
start_date = end_date + timedelta(days=1)

# reset reqs_per_hour_limit if we expect less requests (1 req per stream) than it is allowed by API reqs_per_hour_limit
if len(date_slices) < self.reqs_per_hour_limit:
self.reqs_per_hour_limit = 3600 # 1 query per sec

return date_slices

def request_params(
Expand Down Expand Up @@ -269,9 +264,6 @@ def stream_slices(
for date_slice in date_slices:
stream_slices.append({**funnel_slice, **date_slice})

# reset reqs_per_hour_limit if we expect less requests (1 req per stream) than it is allowed by API reqs_per_hour_limit
if len(stream_slices) < self.reqs_per_hour_limit:
self.reqs_per_hour_limit = 3600 # queries per hour (1 query per sec)
return stream_slices

def request_params(
Expand All @@ -288,7 +280,7 @@ def request_params(
params["unit"] = "day"
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
response.json() example:
{
Expand Down Expand Up @@ -368,7 +360,7 @@ class EngageSchema(MixpanelStream):
def path(self, **kwargs) -> str:
return "engage/properties"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
response.json() example:
{
Expand Down Expand Up @@ -444,7 +436,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
self._total = None
return None

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
{
"page": 0
Expand Down Expand Up @@ -591,7 +583,7 @@ class Revenue(DateSlicesMixin, IncrementalMixpanelStream):
def path(self, **kwargs) -> str:
return "engage/revenue"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
response.json() example:
{
Expand Down Expand Up @@ -634,7 +626,7 @@ class ExportSchema(MixpanelStream):
def path(self, **kwargs) -> str:
return "events/properties/top"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[str]:
def process_response(self, response: requests.Response, **kwargs) -> Iterable[str]:
"""
response.json() example:
{
Expand Down Expand Up @@ -691,7 +683,7 @@ def url_base(self):
def path(self, **kwargs) -> str:
return "export"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""Export API return response.text in JSONL format but each line is a valid JSON object
Raw item example:
{
Expand Down Expand Up @@ -737,8 +729,6 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp

yield item

# wait for X seconds to meet API limitation
time.sleep(3600 / self.reqs_per_hour_limit)

def get_json_schema(self) -> Mapping[str, Any]:
"""
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Select the correct region \(EU or US\) for your Mixpanel project. See detail [he

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| `0.1.2` | 2021-11-02 | [7439](https://github.com/airbytehq/airbyte/issues/7439) | Added delay for all streams to match API limitation of requests rate |
| `0.1.1` | 2021-09-16 | [6075](https://github.com/airbytehq/airbyte/issues/6075) | Added option to select project region |
| `0.1.0` | 2021-07-06 | [3698](https://github.com/airbytehq/airbyte/issues/3698) | created CDK native mixpanel connector |

0 comments on commit 6285d31

Please sign in to comment.