-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Framework v0.1 #2634
Framework v0.1 #2634
Conversation
airbyte-integrations/bases/base-python/base_python/sdk/abstract_source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/streams/auth/oauth.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/streams/core.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/streams/rate_limiting.py
Outdated
Show resolved
Hide resolved
...ntegrations/connectors/source-stripe-singer/sample_files/fullrefresh_configured_catalog.json
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/streams/http.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/streams/http.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-stripe/source_stripe/source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-stripe/source_stripe/source.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
State handling is a bit confusing
The responsibilities of each method for updating/outputting doesn't feel obvious while reading through the code.
Typing
There are a lot of Dicts everywhere and it makes it hard to read and very comment-dependent to understand the code. I feel like a lot of this could be improved by aliasing (configs, different JSON-style dicts, etc).
Performance
How does the performance of this compare to the singer Stripe source? I'd like to make sure the out-of-the-box behavior is comparable.
airbyte-integrations/bases/base-python/base_python/sdk/streams/auth/simple.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-stripe/sample_files/full_catalog.json
Show resolved
Hide resolved
airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/Field.java
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/streams/http.py
Outdated
Show resolved
Hide resolved
:param response: | ||
:return: An iterable containing the parsed response | ||
""" | ||
yield [response] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels strange to need to override these different methods when really we're trying to go from requests.Response
-> Iterable[Dict]
.
Maybe we could use types to show how these are intended to chain together?
""" | ||
return response.status_code == 429 or 500 <= response.status_code < 600 | ||
|
||
def backoff_time(self, response: requests.Response) -> Optional[float]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should specify behavior for a backoff instead of just allowing time? Like attempt number, response, and then allow calculating time? Then someone could do exp. backoff scaled their own way for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also it feels like this should be a Duration
equivalent or milliseconds instead of seconds-level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed in person. Sounds like this will move towards custom decorators/handlers instead of these function overrides.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw this might be a lot cleaner if we can get this merged: litl/backoff#122
airbyte-integrations/bases/base-python/base_python/sdk/streams/rate_limiting.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/abstract_source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/abstract_source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/abstract_source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/abstract_source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/abstract_source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/streams/core.py
Outdated
Show resolved
Hide resolved
def get_updated_state(self, current_state: Mapping[str, Any], latest_record: Mapping[str, Any]): | ||
""" | ||
Inspect the latest record extracted from the data source and the current state object and return an updated state object. | ||
It is safe to mutate the input state object and return it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would you want to allow that? We shouldn't encourage that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
especially taking into account that Mapping
is immutable (for modification there is MutableMapping
)
also return type is missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the alternative is to copy the input state every time (or to create a new state object in this method) which seems not great for performance?
airbyte-integrations/connectors/source-stripe/source_stripe/source.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool! added a few thoughts. i'm going to just leave a comment since you are not lacking for reviewers on this one!
airbyte-integrations/bases/base-python/base_python/sdk/abstract_source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/abstract_source.py
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/streams/auth/simple.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/streams/core.py
Outdated
Show resolved
Hide resolved
|
||
class IncrementalStream(Stream, ABC): | ||
@property | ||
def cursor_field(self) -> Union[str, List[str]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
@property | ||
def cursor_field(self) -> Union[str, List[str]]: | ||
""" | ||
Override to return the name of the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this method be called default_cursor_field
? it sounds like this is only populated if the source is setting the cursor field if i'm reading the comment correctly. or maybe call this method source_defined_cursor
and then the method be low is just has_source_defined_cursor
and default impl can just check if this method returns null / empty array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is either cursor_field or not, what makes it default_cursor_field
? If there is default_cursor_field
then there should be actual
/current_cursor_field
?
also what is the case when we need additional property source_defined_cursor
?
isn't cursor_field
self sufficient? i.e. it is either null/[] or str/[str]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cursor_field
in the general case could be chosen by a user. For example when replicating data from a SQL database, you could choose to replicate based on created_at
or updated_at
. In this case, because the cursor is configurable by the user, source_defined_cursor
would return False even though the cursor field is defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I think base abstraction should include this logic of setting custom cursor field
- I think the name
source_defined_cursor
could be confusing, maybe something likecursor_field_overriden
orhas_custom_cursor_field
? - If we have
cursor_field
anddefault_cursor_field
, wouldn'tsource_defined_cursor
implementation just compare if one is equal to another? Also I still don't see the case when the stream might need this information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Added this
- Agreed with you. Unfortunately currently this is at the protocol level so changing this is a heavy lift because it would change all connectors, the UI, and the core API. To make this better I defaulted all HTTP streams to
True
on this field so the user doesn't need to think about this (most HTTP sources have a static cursor field anyways unlike e.g: databases)
@property | ||
def cursor_field(self) -> Union[str, List[str]]: | ||
""" | ||
Override to return the name of the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is either cursor_field or not, what makes it default_cursor_field
? If there is default_cursor_field
then there should be actual
/current_cursor_field
?
also what is the case when we need additional property source_defined_cursor
?
isn't cursor_field
self sufficient? i.e. it is either null/[] or str/[str]
airbyte-integrations/bases/base-python/base_python/sdk/abstract_source.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/abstract_source.py
Show resolved
Hide resolved
airbyte-integrations/bases/base-python/base_python/sdk/streams/auth/token.py
Show resolved
Hide resolved
ff14bc2
to
272d4b9
Compare
🥇 |
closes #2647
closes #2651
Alpha version of the source framework.
Main additions:
Current features of HTTP stream:
child=<stream>
and everything (mostly) wires up correctly.Reading Order
source_stripe/source.py
for an example source (this could replace the singer Stripe source)abstract_source.py
for how things all come togetherstreams/core.py
for the core stream interfacestreams/http.py
auth/core.py
andauth/simple.py
for a basic idea of the oauth stuffstreams/rate_limiting.py