Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added multiprocessing #57

Merged
merged 3 commits into from
Oct 18, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ app.run(host="0.0.0.0", port=8000)
* [Middleware](docs/middleware.md)
* [Exceptions](docs/exceptions.md)
* [Blueprints](docs/blueprints.md)
* [Deploying](docs/deploying.md)
* [Contributing](docs/contributing.md)
* [License](LICENSE)

Expand Down
35 changes: 35 additions & 0 deletions docs/deploying.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Deploying

When it comes to deploying Sanic, there's not much to it, but there are
a few things to take note of.

## Workers

By default, Sanic listens in the main process using only 1 CPU core.
To crank up the juice, just specify the number of workers in the run
arguments like so:

```python
app.run(host='0.0.0.0', port=1337, workers=4)
```

Sanic will automatically spin up multiple processes and route
traffic between them. We recommend as many workers as you have
available cores.

## Running via Command

If you like using command line arguments, you can launch a sanic server
by executing the module. For example, if you initialized sanic as
app in a file named server.py, you could run the server like so:

`python -m sanic server.app --host=0.0.0.0 --port=1337 --workers=4`

With this way of running sanic, it is not necessary to run app.run in
your python file. If you do, just make sure you wrap it in name == main
like so:

```python
if __name__ == '__main__':
app.run(host='0.0.0.0', port=1337, workers=4)
```
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ tox
gunicorn
bottle
kyoukai
falcon
36 changes: 36 additions & 0 deletions sanic/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from argparse import ArgumentParser
from importlib import import_module

from .log import log
from .sanic import Sanic

if __name__ == "__main__":
parser = ArgumentParser(prog='sanic')
parser.add_argument('--host', dest='host', type=str, default='127.0.0.1')
parser.add_argument('--port', dest='port', type=int, default=8000)
parser.add_argument('--workers', dest='workers', type=int, default=1, )
parser.add_argument('--debug', dest='debug', action="store_true")
parser.add_argument('module')
args = parser.parse_args()

try:
module_parts = args.module.split(".")
module_name = ".".join(module_parts[:-1])
app_name = module_parts[-1]

module = import_module(module_name)
app = getattr(module, app_name, None)
if type(app) is not Sanic:
raise ValueError("Module is not a Sanic app, it is a {}. "
"Perhaps you meant {}.app?"
.format(type(app).__name__, args.module))

app.run(host=args.host, port=args.port,
workers=args.workers, debug=args.debug)
except ImportError:
log.error("No module named {} found.\n"
" Example File: project/sanic_server.py -> app\n"
" Example Module: project.sanic_server.app"
.format(module_name))
except ValueError as e:
log.error("{}".format(e))
81 changes: 68 additions & 13 deletions sanic/sanic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import asyncio
from argparse import ArgumentParser
from asyncio import get_event_loop
from inspect import isawaitable
from multiprocessing import Process, Event
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the multiprocess function should not contain in the program, just let user to decide use multi process or not

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the multiprocess function should not contain in the program, just let user to decide use multi process or not

Copy link
Contributor Author

@channelcat channelcat Oct 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If run is executed with 1 worker, it won't use multiprocessing. Should we not import it by default? I thought it would be okay since it's part of the standard library.

from signal import signal, SIGTERM, SIGINT
from time import sleep
from traceback import format_exc

from .config import Config
Expand Down Expand Up @@ -167,7 +171,7 @@ async def handle_request(self, request, response_callback):
# -------------------------------------------------------------------- #

def run(self, host="127.0.0.1", port=8000, debug=False, after_start=None,
before_stop=None):
before_stop=None, sock=None, workers=1):
"""
Runs the HTTP Server and listens until keyboard interrupt or term
signal. On termination, drains connections before closing.
Expand All @@ -178,11 +182,24 @@ def run(self, host="127.0.0.1", port=8000, debug=False, after_start=None,
listening
:param before_stop: Function to be executed when a stop signal is
received before it is respected
:param sock: Socket for the server to accept connections from
:param workers: Number of processes
received before it is respected
:return: Nothing
"""
self.error_handler.debug = True
self.debug = debug

server_settings = {
'host': host,
'port': port,
'sock': sock,
'debug': debug,
'request_handler': self.handle_request,
'request_timeout': self.config.REQUEST_TIMEOUT,
'request_max_size': self.config.REQUEST_MAX_SIZE,
}

if debug:
log.setLevel(logging.DEBUG)
log.debug(self.config.LOGO)
Expand All @@ -191,23 +208,61 @@ def run(self, host="127.0.0.1", port=8000, debug=False, after_start=None,
log.info('Goin\' Fast @ http://{}:{}'.format(host, port))

try:
serve(
host=host,
port=port,
debug=debug,
after_start=after_start,
before_stop=before_stop,
request_handler=self.handle_request,
request_timeout=self.config.REQUEST_TIMEOUT,
request_max_size=self.config.REQUEST_MAX_SIZE,
)
if workers == 1:
server_settings['after_start'] = after_start
server_settings['before_stop'] = before_stop
serve(**server_settings)
else:
log.info('Spinning up {} workers...'.format(workers))

self.serve_multiple(server_settings, workers)

except Exception as e:
log.exception(
'Experienced exception while trying to serve: {}'.format(e))
pass

log.info("Server Stopped")

def stop(self):
"""
This kills the Sanic
"""
asyncio.get_event_loop().stop()
get_event_loop().stop()

@staticmethod
def serve_multiple(server_settings, workers, stop_event=None):
"""
Starts multiple server processes simultaneously. Stops on interrupt
and terminate signals, and drains connections when complete.
:param server_settings: kw arguments to be passed to the serve function
:param workers: number of workers to launch
:param stop_event: if provided, is used as a stop signal
:return:
"""
server_settings['reuse_port'] = True

# Create a stop event to be triggered by a signal
if not stop_event:
stop_event = Event()
signal(SIGINT, lambda s, f: stop_event.set())
signal(SIGTERM, lambda s, f: stop_event.set())

processes = []
for w in range(workers):
process = Process(target=serve, kwargs=server_settings)
process.start()
processes.append(process)

# Infinitely wait for the stop event
try:
while not stop_event.is_set():
sleep(0.3)
except:
pass

log.info('Spinning down workers...')
for process in processes:
process.terminate()
for process in processes:
process.join()
7 changes: 3 additions & 4 deletions sanic/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ def close_if_idle(self):


def serve(host, port, request_handler, after_start=None, before_stop=None,
debug=False, request_timeout=60,
request_max_size=None):
debug=False, request_timeout=60, sock=None,
request_max_size=None, reuse_port=False):
# Create Event Loop
loop = async_loop.new_event_loop()
asyncio.set_event_loop(loop)
Expand All @@ -176,7 +176,7 @@ def serve(host, port, request_handler, after_start=None, before_stop=None,
request_handler=request_handler,
request_timeout=request_timeout,
request_max_size=request_max_size,
), host, port)
), host, port, reuse_port=reuse_port, sock=sock)
try:
http_server = loop.run_until_complete(server_coroutine)
except Exception as e:
Expand Down Expand Up @@ -217,4 +217,3 @@ def serve(host, port, request_handler, after_start=None, before_stop=None,
loop.run_until_complete(asyncio.sleep(0.1))

loop.close()
log.info("Server Stopped")
52 changes: 52 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from multiprocessing import Array, Event, Process
from time import sleep
from ujson import loads as json_loads

from sanic import Sanic
from sanic.response import json
from sanic.utils import local_request, HOST, PORT


# ------------------------------------------------------------ #
# GET
# ------------------------------------------------------------ #

def test_json():
app = Sanic('test_json')

response = Array('c', 50)
@app.route('/')
async def handler(request):
return json({"test": True})

stop_event = Event()
async def after_start(*args, **kwargs):
http_response = await local_request('get', '/')
response.value = http_response.text.encode()
stop_event.set()

def rescue_crew():
sleep(5)
stop_event.set()

rescue_process = Process(target=rescue_crew)
rescue_process.start()

app.serve_multiple({
'host': HOST,
'port': PORT,
'after_start': after_start,
'request_handler': app.handle_request,
'request_max_size': 100000,
}, workers=2, stop_event=stop_event)

rescue_process.terminate()

try:
results = json_loads(response.value)
except:
raise ValueError("Expected JSON response but got '{}'".format(response))

assert results.get('test') == True

test_json()
11 changes: 11 additions & 0 deletions tests/performance/falcon/simple_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Run with: gunicorn --workers=1 --worker-class=meinheld.gmeinheld.MeinheldWorker falc:app

import falcon
import ujson as json

class TestResource:
def on_get(self, req, resp):
resp.body = json.dumps({"test": True})

app = falcon.API()
app.add_route('/', TestResource())
4 changes: 2 additions & 2 deletions tests/performance/sanic/simple_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
async def test(request):
return json({"test": True})


app.run(host="0.0.0.0", port=sys.argv[1])
if __name__ == '__main__':
app.run(host="0.0.0.0", port=sys.argv[1])
53 changes: 53 additions & 0 deletions tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from multiprocessing import Array, Event, Process
from time import sleep
from ujson import loads as json_loads

from sanic import Sanic
from sanic.response import json
from sanic.utils import local_request, HOST, PORT


# ------------------------------------------------------------ #
# GET
# ------------------------------------------------------------ #

# TODO: Figure out why this freezes on pytest but not when
# executed via interpreter

def skip_test_multiprocessing():
app = Sanic('test_json')

response = Array('c', 50)
@app.route('/')
async def handler(request):
return json({"test": True})

stop_event = Event()
async def after_start(*args, **kwargs):
http_response = await local_request('get', '/')
response.value = http_response.text.encode()
stop_event.set()

def rescue_crew():
sleep(5)
stop_event.set()

rescue_process = Process(target=rescue_crew)
rescue_process.start()

app.serve_multiple({
'host': HOST,
'port': PORT,
'after_start': after_start,
'request_handler': app.handle_request,
'request_max_size': 100000,
}, workers=2, stop_event=stop_event)

rescue_process.terminate()

try:
results = json_loads(response.value)
except:
raise ValueError("Expected JSON response but got '{}'".format(response))

assert results.get('test') == True