Skip to content

Commit

Permalink
fix: Clean up resources when qdrant not responding while uploading to…
Browse files Browse the repository at this point in the history
… elevate deadlock (#840)
  • Loading branch information
hh-space-invader authored and joein committed Jan 16, 2025
1 parent ac1d535 commit 8507d65
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions qdrant_client/parallel_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def input_queue_iterable() -> Iterable[Any]:
# See:
# https://docs.python.org/3.6/library/multiprocessing.html?highlight=process#pipes-and-queues
# https://docs.python.org/3.6/library/multiprocessing.html?highlight=process#programming-guidelines
input_queue.close()
output_queue.close()
input_queue.join_thread()
output_queue.join_thread()

with num_active_workers.get_lock():
Expand All @@ -90,7 +92,7 @@ def __init__(self, num_workers: int, worker: Type[Worker], start_method: Optiona
self.ctx: BaseContext = get_context(start_method)
self.processes: List[BaseProcess] = []
self.queue_size = self.num_workers * max_internal_batch_size

self.emergency_shutdown = False
self.num_active_workers: Optional[BaseValue] = None

def start(self, **kwargs: Any) -> None:
Expand Down Expand Up @@ -162,15 +164,23 @@ def unordered_map(self, stream: Iterable[Any], *args: Any, **kwargs: Any) -> Ite
finally:
assert self.input_queue is not None, "Input queue is None"
assert self.output_queue is not None, "Output queue is None"
self.join()
self.input_queue.close()
self.output_queue.close()
if self.emergency_shutdown:
self.input_queue.cancel_join_thread()
self.output_queue.cancel_join_thread()
else:
self.input_queue.join_thread()
self.output_queue.join_thread()

def join_or_terminate(self, timeout: Optional[int] = 1) -> None:
"""
Emergency shutdown
@param timeout:
@return:
"""
self.emergency_shutdown = True
for process in self.processes:
process.join(timeout=timeout)
if process.is_alive():
Expand All @@ -193,4 +203,5 @@ def __del__(self) -> None:
https://eli.thegreenplace.net/2009/06/12/safely-using-destructors-in-python/.
"""
for process in self.processes:
process.terminate()
if process.is_alive():
process.terminate()

0 comments on commit 8507d65

Please sign in to comment.