From 485ddf8bbd415cb13995799433ee97798866c18d Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Wed, 22 Jun 2022 17:11:48 -0500 Subject: [PATCH 1/5] add clean up hook to sinks called at end of pipe drain --- singer_sdk/sinks/core.py | 4 ++++ singer_sdk/target_base.py | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 693596799..16ddd263c 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -401,3 +401,7 @@ def activate_version(self, new_version: int) -> None: "ACTIVATE_VERSION message received but not implemented by this target. " "Ignoring." ) + + def clean_up(self) -> None: + """Perform any clean up actions required at end of a stream.""" + pass diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index fa400fae7..88ee8ac05 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -273,7 +273,7 @@ def _process_lines(self, file_input: IO[str]) -> Counter[str]: def _process_endofpipe(self) -> None: """Called after all input lines have been read.""" - self.drain_all() + self.drain_all(is_endofpipe=True) def _process_record_message(self, message_dict: dict) -> None: """Process a RECORD message. @@ -403,15 +403,19 @@ def _process_activate_version_message(self, message_dict: dict) -> None: # Sink drain methods @final - def drain_all(self) -> None: + def drain_all(self, is_endofpipe=False) -> None: """Drains all sinks, starting with those cleared due to changed schema. This method is internal to the SDK and should not need to be overridden. """ state = copy.deepcopy(self._latest_state) self._drain_all(self._sinks_to_clear, 1) + if is_endofpipe: + (sink.clean_up() for sink in self._sinks_active.values()) self._sinks_to_clear = [] self._drain_all(list(self._sinks_active.values()), self.max_parallelism) + if is_endofpipe: + (sink.clean_up() for sink in self._sinks_to_clear) self._write_state_message(state) self._reset_max_record_age() From 42d17f2902c80e284665aeaf522ec2d9dda6fe54 Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Thu, 23 Jun 2022 11:55:15 -0500 Subject: [PATCH 2/5] make message more clear for devs and add decorator --- singer_sdk/sinks/core.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 16ddd263c..630882c52 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -402,6 +402,11 @@ def activate_version(self, new_version: int) -> None: "Ignoring." ) + @abc.abstractmethod def clean_up(self) -> None: - """Perform any clean up actions required at end of a stream.""" + """Perform any clean up actions required at end of a stream. + + Implementations should ensure that clean up does not affect resources + that may be in use from other instances of the same sink. Stream name alone + should not be relied on, it's recommended to use a uuid as well.""" pass From 6118a9faa72343caf51868b4fedc04379a0ecd1b Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Thu, 23 Jun 2022 16:55:26 -0500 Subject: [PATCH 3/5] unwrap the comprehension so mypy doesnt get mad --- singer_sdk/target_base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 88ee8ac05..392db8f3e 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -411,11 +411,13 @@ def drain_all(self, is_endofpipe=False) -> None: state = copy.deepcopy(self._latest_state) self._drain_all(self._sinks_to_clear, 1) if is_endofpipe: - (sink.clean_up() for sink in self._sinks_active.values()) + for sink in self._sinks_to_clear: + sink.clean_up() self._sinks_to_clear = [] self._drain_all(list(self._sinks_active.values()), self.max_parallelism) if is_endofpipe: - (sink.clean_up() for sink in self._sinks_to_clear) + for sink in self._sinks_active.values(): + sink.clean_up() self._write_state_message(state) self._reset_max_record_age() From 3c5fa09fdd757a4eaccdd8da22a4bf528e87ebd9 Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Thu, 23 Jun 2022 16:55:59 -0500 Subject: [PATCH 4/5] remove abstract method --- singer_sdk/sinks/core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 630882c52..d0ab563f9 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -402,7 +402,6 @@ def activate_version(self, new_version: int) -> None: "Ignoring." ) - @abc.abstractmethod def clean_up(self) -> None: """Perform any clean up actions required at end of a stream. From 714f7608a703a566e419a94de354790b1f38ae5c Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Fri, 24 Jun 2022 10:00:18 -0500 Subject: [PATCH 5/5] fix flake8 errors --- singer_sdk/sinks/core.py | 3 ++- singer_sdk/target_base.py | 8 +++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index d0ab563f9..1b5a50f0f 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -407,5 +407,6 @@ def clean_up(self) -> None: Implementations should ensure that clean up does not affect resources that may be in use from other instances of the same sink. Stream name alone - should not be relied on, it's recommended to use a uuid as well.""" + should not be relied on, it's recommended to use a uuid as well. + """ pass diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 392db8f3e..dad17f09c 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -403,10 +403,16 @@ def _process_activate_version_message(self, message_dict: dict) -> None: # Sink drain methods @final - def drain_all(self, is_endofpipe=False) -> None: + def drain_all(self, is_endofpipe: bool = False) -> None: """Drains all sinks, starting with those cleared due to changed schema. This method is internal to the SDK and should not need to be overridden. + + Args: + is_endofpipe: This is passed by the + :meth:`~singer_sdk.Sink._process_endofpipe()` which + is called after the target instance has finished + listening to the stdin """ state = copy.deepcopy(self._latest_state) self._drain_all(self._sinks_to_clear, 1)