-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added multiprocessing #57
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
Version 0.1 | ||
----------- | ||
- 0.1.4 - Multiprocessing | ||
- 0.1.3 - Blueprint support | ||
- 0.1.1 - 0.1.2 - Struggling to update pypi via CI | ||
|
||
Released to public. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# Deploying | ||
|
||
When it comes to deploying Sanic, there's not much to it, but there are | ||
a few things to take note of. | ||
|
||
## Workers | ||
|
||
By default, Sanic listens in the main process using only 1 CPU core. | ||
To crank up the juice, just specify the number of workers in the run | ||
arguments like so: | ||
|
||
```python | ||
app.run(host='0.0.0.0', port=1337, workers=4) | ||
``` | ||
|
||
Sanic will automatically spin up multiple processes and route | ||
traffic between them. We recommend as many workers as you have | ||
available cores. | ||
|
||
## Running via Command | ||
|
||
If you like using command line arguments, you can launch a sanic server | ||
by executing the module. For example, if you initialized sanic as | ||
app in a file named server.py, you could run the server like so: | ||
|
||
`python -m sanic server.app --host=0.0.0.0 --port=1337 --workers=4` | ||
|
||
With this way of running sanic, it is not necessary to run app.run in | ||
your python file. If you do, just make sure you wrap it in name == main | ||
like so: | ||
|
||
```python | ||
if __name__ == '__main__': | ||
app.run(host='0.0.0.0', port=1337, workers=4) | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,3 +8,4 @@ tox | |
gunicorn | ||
bottle | ||
kyoukai | ||
falcon |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from argparse import ArgumentParser | ||
from importlib import import_module | ||
|
||
from .log import log | ||
from .sanic import Sanic | ||
|
||
if __name__ == "__main__": | ||
parser = ArgumentParser(prog='sanic') | ||
parser.add_argument('--host', dest='host', type=str, default='127.0.0.1') | ||
parser.add_argument('--port', dest='port', type=int, default=8000) | ||
parser.add_argument('--workers', dest='workers', type=int, default=1, ) | ||
parser.add_argument('--debug', dest='debug', action="store_true") | ||
parser.add_argument('module') | ||
args = parser.parse_args() | ||
|
||
try: | ||
module_parts = args.module.split(".") | ||
module_name = ".".join(module_parts[:-1]) | ||
app_name = module_parts[-1] | ||
|
||
module = import_module(module_name) | ||
app = getattr(module, app_name, None) | ||
if type(app) is not Sanic: | ||
raise ValueError("Module is not a Sanic app, it is a {}. " | ||
"Perhaps you meant {}.app?" | ||
.format(type(app).__name__, args.module)) | ||
|
||
app.run(host=args.host, port=args.port, | ||
workers=args.workers, debug=args.debug) | ||
except ImportError: | ||
log.error("No module named {} found.\n" | ||
" Example File: project/sanic_server.py -> app\n" | ||
" Example Module: project.sanic_server.app" | ||
.format(module_name)) | ||
except ValueError as e: | ||
log.error("{}".format(e)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,8 @@ | ||
import asyncio | ||
from asyncio import get_event_loop | ||
from inspect import isawaitable | ||
from multiprocessing import Process, Event | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the multiprocess function should not contain in the program, just let user to decide use multi process or not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If run is executed with 1 worker, it won't use multiprocessing. Should we not import it by default? I thought it would be okay since it's part of the standard library. |
||
from signal import signal, SIGTERM, SIGINT | ||
from time import sleep | ||
from traceback import format_exc | ||
|
||
from .config import Config | ||
|
@@ -167,7 +170,7 @@ async def handle_request(self, request, response_callback): | |
# -------------------------------------------------------------------- # | ||
|
||
def run(self, host="127.0.0.1", port=8000, debug=False, after_start=None, | ||
before_stop=None): | ||
before_stop=None, sock=None, workers=1): | ||
""" | ||
Runs the HTTP Server and listens until keyboard interrupt or term | ||
signal. On termination, drains connections before closing. | ||
|
@@ -178,11 +181,24 @@ def run(self, host="127.0.0.1", port=8000, debug=False, after_start=None, | |
listening | ||
:param before_stop: Function to be executed when a stop signal is | ||
received before it is respected | ||
:param sock: Socket for the server to accept connections from | ||
:param workers: Number of processes | ||
received before it is respected | ||
:return: Nothing | ||
""" | ||
self.error_handler.debug = True | ||
self.debug = debug | ||
|
||
server_settings = { | ||
'host': host, | ||
'port': port, | ||
'sock': sock, | ||
'debug': debug, | ||
'request_handler': self.handle_request, | ||
'request_timeout': self.config.REQUEST_TIMEOUT, | ||
'request_max_size': self.config.REQUEST_MAX_SIZE, | ||
} | ||
|
||
if debug: | ||
log.setLevel(logging.DEBUG) | ||
log.debug(self.config.LOGO) | ||
|
@@ -191,23 +207,61 @@ def run(self, host="127.0.0.1", port=8000, debug=False, after_start=None, | |
log.info('Goin\' Fast @ http://{}:{}'.format(host, port)) | ||
|
||
try: | ||
serve( | ||
host=host, | ||
port=port, | ||
debug=debug, | ||
after_start=after_start, | ||
before_stop=before_stop, | ||
request_handler=self.handle_request, | ||
request_timeout=self.config.REQUEST_TIMEOUT, | ||
request_max_size=self.config.REQUEST_MAX_SIZE, | ||
) | ||
if workers == 1: | ||
server_settings['after_start'] = after_start | ||
server_settings['before_stop'] = before_stop | ||
serve(**server_settings) | ||
else: | ||
log.info('Spinning up {} workers...'.format(workers)) | ||
|
||
self.serve_multiple(server_settings, workers) | ||
|
||
except Exception as e: | ||
log.exception( | ||
'Experienced exception while trying to serve: {}'.format(e)) | ||
pass | ||
|
||
log.info("Server Stopped") | ||
|
||
def stop(self): | ||
""" | ||
This kills the Sanic | ||
""" | ||
asyncio.get_event_loop().stop() | ||
get_event_loop().stop() | ||
|
||
@staticmethod | ||
def serve_multiple(server_settings, workers, stop_event=None): | ||
""" | ||
Starts multiple server processes simultaneously. Stops on interrupt | ||
and terminate signals, and drains connections when complete. | ||
:param server_settings: kw arguments to be passed to the serve function | ||
:param workers: number of workers to launch | ||
:param stop_event: if provided, is used as a stop signal | ||
:return: | ||
""" | ||
server_settings['reuse_port'] = True | ||
|
||
# Create a stop event to be triggered by a signal | ||
if not stop_event: | ||
stop_event = Event() | ||
signal(SIGINT, lambda s, f: stop_event.set()) | ||
signal(SIGTERM, lambda s, f: stop_event.set()) | ||
|
||
processes = [] | ||
for w in range(workers): | ||
process = Process(target=serve, kwargs=server_settings) | ||
process.start() | ||
processes.append(process) | ||
|
||
# Infinitely wait for the stop event | ||
try: | ||
while not stop_event.is_set(): | ||
sleep(0.3) | ||
except: | ||
pass | ||
|
||
log.info('Spinning down workers...') | ||
for process in processes: | ||
process.terminate() | ||
for process in processes: | ||
process.join() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from multiprocessing import Array, Event, Process | ||
from time import sleep | ||
from ujson import loads as json_loads | ||
|
||
from sanic import Sanic | ||
from sanic.response import json | ||
from sanic.utils import local_request, HOST, PORT | ||
|
||
|
||
# ------------------------------------------------------------ # | ||
# GET | ||
# ------------------------------------------------------------ # | ||
|
||
def test_json(): | ||
app = Sanic('test_json') | ||
|
||
response = Array('c', 50) | ||
@app.route('/') | ||
async def handler(request): | ||
return json({"test": True}) | ||
|
||
stop_event = Event() | ||
async def after_start(*args, **kwargs): | ||
http_response = await local_request('get', '/') | ||
response.value = http_response.text.encode() | ||
stop_event.set() | ||
|
||
def rescue_crew(): | ||
sleep(5) | ||
stop_event.set() | ||
|
||
rescue_process = Process(target=rescue_crew) | ||
rescue_process.start() | ||
|
||
app.serve_multiple({ | ||
'host': HOST, | ||
'port': PORT, | ||
'after_start': after_start, | ||
'request_handler': app.handle_request, | ||
'request_max_size': 100000, | ||
}, workers=2, stop_event=stop_event) | ||
|
||
rescue_process.terminate() | ||
|
||
try: | ||
results = json_loads(response.value) | ||
except: | ||
raise ValueError("Expected JSON response but got '{}'".format(response)) | ||
|
||
assert results.get('test') == True | ||
|
||
test_json() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# Run with: gunicorn --workers=1 --worker-class=meinheld.gmeinheld.MeinheldWorker falc:app | ||
|
||
import falcon | ||
import ujson as json | ||
|
||
class TestResource: | ||
def on_get(self, req, resp): | ||
resp.body = json.dumps({"test": True}) | ||
|
||
app = falcon.API() | ||
app.add_route('/', TestResource()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
from multiprocessing import Array, Event, Process | ||
from time import sleep | ||
from ujson import loads as json_loads | ||
|
||
from sanic import Sanic | ||
from sanic.response import json | ||
from sanic.utils import local_request, HOST, PORT | ||
|
||
|
||
# ------------------------------------------------------------ # | ||
# GET | ||
# ------------------------------------------------------------ # | ||
|
||
# TODO: Figure out why this freezes on pytest but not when | ||
# executed via interpreter | ||
|
||
def skip_test_multiprocessing(): | ||
app = Sanic('test_json') | ||
|
||
response = Array('c', 50) | ||
@app.route('/') | ||
async def handler(request): | ||
return json({"test": True}) | ||
|
||
stop_event = Event() | ||
async def after_start(*args, **kwargs): | ||
http_response = await local_request('get', '/') | ||
response.value = http_response.text.encode() | ||
stop_event.set() | ||
|
||
def rescue_crew(): | ||
sleep(5) | ||
stop_event.set() | ||
|
||
rescue_process = Process(target=rescue_crew) | ||
rescue_process.start() | ||
|
||
app.serve_multiple({ | ||
'host': HOST, | ||
'port': PORT, | ||
'after_start': after_start, | ||
'request_handler': app.handle_request, | ||
'request_max_size': 100000, | ||
}, workers=2, stop_event=stop_event) | ||
|
||
rescue_process.terminate() | ||
|
||
try: | ||
results = json_loads(response.value) | ||
except: | ||
raise ValueError("Expected JSON response but got '{}'".format(response)) | ||
|
||
assert results.get('test') == True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the multiprocess function should not contain in the program, just let user to decide use multi process or not