diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 650e2f6619158..facba8fd0e592 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -371,16 +371,26 @@ def close(self, reason=None): _LOGGER.debug("Stopping scheduler.") self._scheduler.shutdown() self._scheduler = None + + # Leaser and dispatcher reference each other through the shared + # StreamingPullManager instance, i.e. "self", thus do not set their + # references to None until both have been shut down. + # + # NOTE: Even if the dispatcher operates on an inactive leaser using + # the latter's add() and remove() methods, these have no impact on + # the stopped leaser (the leaser is never again re-started). Ditto + # for the manager's maybe_resume_consumer() / maybe_pause_consumer(), + # because the consumer gets shut down first. _LOGGER.debug("Stopping leaser.") self._leaser.stop() - self._leaser = None _LOGGER.debug("Stopping dispatcher.") self._dispatcher.stop() self._dispatcher = None + # dispatcher terminated, OK to dispose the leaser reference now + self._leaser = None _LOGGER.debug("Stopping heartbeater.") self._heartbeater.stop() self._heartbeater = None - self._rpc = None self._closed = True _LOGGER.debug("Finished stopping manager.")