Skip to content

Commit

Permalink
Issue #412 Added thread_pool handler
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmorell committed Nov 19, 2022
1 parent 3cd8e6d commit 9579866
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 3 deletions.
29 changes: 26 additions & 3 deletions rollbar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from rollbar.lib import events, filters, dict_merge, parse_qs, text, transport, urljoin, iteritems, defaultJSONEncode


__version__ = '0.16.3'
__version__ = '0.16.4beta'
__log_name__ = 'rollbar'
log = logging.getLogger(__log_name__)

Expand Down Expand Up @@ -124,7 +124,7 @@ def wrap(*args, **kwargs):
from twisted.internet.ssl import CertificateOptions
from twisted.internet import task, defer, ssl, reactor
from zope.interface import implementer

@implementer(IPolicyForHTTPS)
class VerifyHTTPS(object):
def __init__(self):
Expand Down Expand Up @@ -275,7 +275,12 @@ def _get_fastapi_request():
'root': None, # root path to your code
'branch': None, # git branch name
'code_version': None,
'handler': 'default', # 'blocking', 'thread' (default), 'async', 'agent', 'tornado', 'gae', 'twisted' or 'httpx'
# 'blocking', 'thread' (default), 'async', 'agent', 'tornado', 'gae', 'twisted', 'httpx' or 'thread_pool'
# 'async' requires Python 3.4 or higher.
# 'httpx' requires Python 3.7 or higher.
# 'thread_pool' requires Python 3.2 or higher.
'handler': 'default',
'thread_pool_workers': None,
'endpoint': DEFAULT_ENDPOINT,
'timeout': DEFAULT_TIMEOUT,
'agent.log_file': 'log.rollbar',
Expand Down Expand Up @@ -383,6 +388,9 @@ def init(access_token, environment='production', scrub_fields=None, url_fields=N

if SETTINGS.get('handler') == 'agent':
agent_log = _create_agent_log()
elif SETTINGS.get('handler') == 'thread_pool':
from rollbar.lib.thread_pool import init_pool
init_pool(SETTINGS.get('thread_pool_workers', None))

if not SETTINGS['locals']['safelisted_types'] and SETTINGS['locals']['whitelisted_types']:
warnings.warn('whitelisted_types deprecated use safelisted_types instead', DeprecationWarning)
Expand Down Expand Up @@ -523,6 +531,7 @@ def send_payload(payload, access_token):
- 'gae': calls _send_payload_appengine() (which makes a blocking call to Google App Engine)
- 'twisted': calls _send_payload_twisted() (which makes an async HTTP request using Twisted and Treq)
- 'httpx': calls _send_payload_httpx() (which makes an async HTTP request using HTTPX)
- 'thread_pool': uses a pool of worker threads to make HTTP requests off the main thread. Returns immediately.
"""
payload = events.on_payload(payload)
if payload is False:
Expand Down Expand Up @@ -569,6 +578,8 @@ def send_payload(payload, access_token):
_send_payload_async(payload_str, access_token)
elif handler == 'thread':
_send_payload_thread(payload_str, access_token)
elif handler == 'thread_pool':
_send_payload_thread_pool(payload_str, access_token)
else:
# default to 'thread'
_send_payload_thread(payload_str, access_token)
Expand Down Expand Up @@ -1510,6 +1521,18 @@ def _send_payload_thread(payload_str, access_token):
thread.start()


def _send_payload_pool(payload_str, access_token):
try:
_post_api('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


def _send_payload_thread_pool(payload_str, access_token):
from rollbar.lib.thread_pool import submit
submit(_send_payload_pool, payload_str, access_token)


def _send_payload_appengine(payload_str, access_token):
try:
_post_api_appengine('item/', payload_str, access_token=access_token)
Expand Down
31 changes: 31 additions & 0 deletions rollbar/lib/thread_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import logging
from concurrent.futures import ThreadPoolExecutor

_pool = None # type: ThreadPoolExecutor|None

log = logging.getLogger(__name__)


def init_pool(max_workers):
"""
Creates the thread pool with the max workers.
:type max_workers: int
"""
global _pool
_pool = ThreadPoolExecutor(max_workers)


def submit(worker, payload_str, access_token):
"""
Submit a new task to the thread pool.
:type worker: function
:type payload_str: str
:type access_token: str
"""
global _pool
if _pool is None:
log.warning('pyrollbar: Thead pool not initialized. Please ensure init_pool() is called prior to submit().')
return
_pool.submit(worker, payload_str, access_token)
26 changes: 26 additions & 0 deletions rollbar/test/test_rollbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,32 @@ def _raise():

send_payload_httpx.assert_called_once()

@unittest.skipUnless(sys.version_info >= (3, 2), 'concurrent.futures support requires Python3.2+')
@mock.patch('rollbar._send_payload_thread_pool')
def test_thread_pool_handler(self, send_payload_thread_pool):
def _raise():
try:
raise Exception('foo')
except:
rollbar.report_exc_info()
rollbar.SETTINGS['handler'] = 'thread_pool'
_raise()

send_payload_thread_pool.assert_called_once()

@unittest.skipUnless(sys.version_info >= (3, 2), 'concurrent.futures support requires Python3.2+')
def test_thread_pool_submit(self):
from rollbar.lib.thread_pool import init_pool, submit
init_pool(1)
ran = {'nope': True} # dict used so it is not shadowed in run

def run(payload_str, access_token):
ran['nope'] = False

submit(run, 'foo', 'bar')
self.assertFalse(ran['nope'])


@mock.patch('rollbar.send_payload')
def test_args_constructor(self, send_payload):

Expand Down

0 comments on commit 9579866

Please sign in to comment.