Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add end of pipe clean up hook to Sinks #750

Merged
merged 8 commits into from
Jun 24, 2022
9 changes: 9 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,12 @@ def activate_version(self, new_version: int) -> None:
"ACTIVATE_VERSION message received but not implemented by this target. "
"Ignoring."
)

@abc.abstractmethod
def clean_up(self) -> None:
"""Perform any clean up actions required at end of a stream.

Implementations should ensure that clean up does not affect resources
that may be in use from other instances of the same sink. Stream name alone
should not be relied on, it's recommended to use a uuid as well."""
pass
z3z1ma marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 6 additions & 2 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def _process_lines(self, file_input: IO[str]) -> Counter[str]:

def _process_endofpipe(self) -> None:
"""Called after all input lines have been read."""
self.drain_all()
self.drain_all(is_endofpipe=True)

def _process_record_message(self, message_dict: dict) -> None:
"""Process a RECORD message.
Expand Down Expand Up @@ -403,15 +403,19 @@ def _process_activate_version_message(self, message_dict: dict) -> None:
# Sink drain methods

@final
def drain_all(self) -> None:
def drain_all(self, is_endofpipe=False) -> None:
"""Drains all sinks, starting with those cleared due to changed schema.

This method is internal to the SDK and should not need to be overridden.
"""
state = copy.deepcopy(self._latest_state)
self._drain_all(self._sinks_to_clear, 1)
if is_endofpipe:
(sink.clean_up() for sink in self._sinks_active.values())
self._sinks_to_clear = []
self._drain_all(list(self._sinks_active.values()), self.max_parallelism)
if is_endofpipe:
(sink.clean_up() for sink in self._sinks_to_clear)
self._write_state_message(state)
self._reset_max_record_age()

Expand Down