Skip to content
This repository has been archived by the owner on Aug 23, 2024. It is now read-only.

prevent database connections leaks during batch web requests #3

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
45 changes: 27 additions & 18 deletions webscraperequest/batchrequest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from unittest import mock # noqa

from django.db import models
from django.db import connection

try:
from .scraperequest import *
Expand All @@ -18,6 +19,24 @@
logger = logging.getLogger(__name__)


def _request_callback(future, model_instance, field_name, requester, index=None) -> None:
try:
result = future.result()
result['success'] = result.get('success', True)
logger.debug('{} result inserted into database'.format(requester.__class__))
except Exception as e:
result['success'] = result.get('success', False)
result['error'] = getattr(e, 'message', str(e))
logger.error('Error inserting into index {}: {}'.format(index, str(e)))
finally:
result['end_time'] = int(time.time())
model_instance.__dict__[str(field_name)][index].update(result)
model_instance.save()

# explicitly close the connection inside the thread to prevent leaked connections
connection.close()


class BaseBatchWebRequest:
"""
Manages a parallel batch of web requests. Intermediate and final results are
Expand Down Expand Up @@ -48,8 +67,14 @@ def start(self, largs: List[list], keys: List[int] = []) -> None:
pool = ThreadPoolExecutor(self.max_workers)
for i, args in enumerate(largs):
logger.debug('{} submitted to thread pool'.format(self.requester.__class__))
pool.submit(self._request, args).add_done_callback(
functools.partial(self._insert, index=i))
callback = functools.partial(
_request_callback,
model_instance=self.model_instance,
field_name=self.field_name,
requester=self.requester,
index=i
)
pool.submit(self._request, args).add_done_callback(callback)
pool.shutdown(wait=False)

def get_batch_status(self) -> 'BatchStatus': # forward ref for typing
Expand Down Expand Up @@ -97,22 +122,6 @@ def _request(self, args: list) -> Dict[str, Any]:
'error': getattr(e, 'message', str(e)),
}

def _insert(self, future, index=None) -> None:
try:
result = future.result()
result['success'] = result.get('success', True)
logger.debug('{} result inserted into database'.format(self.requester.__class__))
except Exception as e:
logger.error('Error inserting into index {}: {}'
.format(index, str(e)))
result['success'] = result.get('success', False)
result['error'] = getattr(e, 'message', str(e))
finally:
result['end_time'] = int(time.time())
self.model_instance.__dict__[str(self.field_name)][index].update(result)
self.model_instance.save()


class BatchStatus:

def __init__(
Expand Down