Skip to content

Commit

Permalink
[batch] Change server to use aiohttp (#5563)
Browse files Browse the repository at this point in the history
  • Loading branch information
jigold authored and danking committed Mar 18, 2019
1 parent 89711a8 commit e0391af
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 57 deletions.
128 changes: 77 additions & 51 deletions batch/batch/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@
from collections import Counter
import logging
import threading
from flask import Flask, request, jsonify, abort, render_template
import kubernetes as kube
import cerberus
import requests
import uvloop
import aiohttp_jinja2
import jinja2
from aiohttp import web

from .globals import max_id, pod_name_job, job_id_job, _log_path, _read_file, batch_id_batch
from .globals import max_id, _log_path, _read_file, pod_name_job, job_id_job, batch_id_batch
from .globals import next_id, get_recent_events, add_event

from .. import schemas

uvloop.install()

s = sched.scheduler()


Expand Down Expand Up @@ -78,6 +83,22 @@ def make_logger():
instance_id = uuid.uuid4().hex
log.info(f'instance_id = {instance_id}')

app = web.Application()
routes = web.RouteTableDef()
aiohttp_jinja2.setup(app, loader=jinja2.PackageLoader('batch', 'templates'))


def abort(code, reason=None):
if code == 400:
raise web.HTTPBadRequest(reason=reason)
if code == 404:
raise web.HTTPNotFound(reason=reason)
raise web.HTTPException(reason=reason)


def jsonify(data):
return web.json_response(data)


class JobTask: # pylint: disable=R0903
@staticmethod
Expand Down Expand Up @@ -395,12 +416,12 @@ def handler(id, callback, json):
f'callback for job {id} failed due to an error, I will not retry. '
f'Error: {exc}')

threading.Thread(target=handler, args=(self.id, self.callback, self.to_json())).start()
threading.Thread(target=handler, args=(self.id, self.callback, self.to_dict())).start()

if self.batch_id:
batch_id_batch[self.batch_id].mark_job_complete(self)

def to_json(self):
def to_dict(self):
result = {
'id': self.id,
'state': self._state
Expand All @@ -421,14 +442,9 @@ def to_json(self):
return result


app = Flask('batch')

log.info(f'app.root_path = {app.root_path}')


@app.route('/jobs/create', methods=['POST'])
def create_job(): # pylint: disable=R0912
parameters = request.json
@routes.post('/jobs/create')
async def create_job(request): # pylint: disable=R0912
parameters = await request.json()

schema = {
# will be validated when creating pod
Expand Down Expand Up @@ -508,7 +524,7 @@ def create_job(): # pylint: disable=R0912
output_files,
copy_service_account_name,
always_run)
return jsonify(job.to_json())
return jsonify(job.to_dict())


def both_or_neither(x, y): # pylint: disable=C0103
Expand All @@ -517,21 +533,23 @@ def both_or_neither(x, y): # pylint: disable=C0103
return x == y


@app.route('/jobs', methods=['GET'])
def get_job_list():
return jsonify([job.to_json() for _, job in job_id_job.items()])
@routes.get('/jobs')
async def get_job_list(request): # pylint: disable=W0613
return jsonify([job.to_dict() for _, job in job_id_job.items()])


@app.route('/jobs/<int:job_id>', methods=['GET'])
def get_job(job_id):
@routes.get('/jobs/{job_id}')
async def get_job(request):
job_id = int(request.match_info['job_id'])
job = job_id_job.get(job_id)
if not job:
abort(404)
return jsonify(job.to_json())
return jsonify(job.to_dict())


@app.route('/jobs/<int:job_id>/log', methods=['GET'])
def get_job_log(job_id): # pylint: disable=R1710
@routes.get('/jobs/{job_id}/log')
async def get_job_log(request): # pylint: disable=R1710
job_id = int(request.match_info['job_id'])
if job_id > max_id():
abort(404)

Expand All @@ -551,17 +569,19 @@ def get_job_log(job_id): # pylint: disable=R1710
abort(404)


@app.route('/jobs/<int:job_id>/delete', methods=['DELETE'])
def delete_job(job_id):
@routes.delete('/jobs/{job_id}/delete')
async def delete_job(request):
job_id = int(request.match_info['job_id'])
job = job_id_job.get(job_id)
if not job:
abort(404)
job.delete()
return jsonify({})


@app.route('/jobs/<int:job_id>/cancel', methods=['POST'])
def cancel_job(job_id):
@routes.post('/jobs/{job_id}/cancel')
async def cancel_job(request):
job_id = int(request.match_info['job_id'])
job = job_id_job.get(job_id)
if not job:
abort(404)
Expand Down Expand Up @@ -606,7 +626,7 @@ def handler(id, job_id, callback, json):

threading.Thread(
target=handler,
args=(self.id, job.id, self.callback, job.to_json())
args=(self.id, job.id, self.callback, job.to_dict())
).start()

def close(self):
Expand All @@ -616,7 +636,7 @@ def close(self):
else:
log.info(f're-closing batch {self.id}, ttl was {self.ttl}')

def to_json(self):
def to_dict(self):
state_count = Counter([j._state for j in self.jobs])
return {
'id': self.id,
Expand All @@ -631,9 +651,9 @@ def to_json(self):
}


@app.route('/batches/create', methods=['POST'])
def create_batch():
parameters = request.json
@routes.post('/batches/create')
async def create_batch(request):
parameters = await request.json()

schema = {
'attributes': {
Expand All @@ -649,28 +669,31 @@ def create_batch():
abort(400, 'invalid request: {}'.format(validator.errors))

batch = Batch(parameters.get('attributes'), parameters.get('callback'), parameters.get('ttl'))
return jsonify(batch.to_json())
return jsonify(batch.to_dict())


@app.route('/batches/<int:batch_id>', methods=['GET'])
def get_batch(batch_id):
@routes.get('/batches/{batch_id}')
async def get_batch(request):
batch_id = int(request.match_info['batch_id'])
batch = batch_id_batch.get(batch_id)
if not batch:
abort(404)
return jsonify(batch.to_json())
return jsonify(batch.to_dict())


@app.route('/batches/<int:batch_id>/delete', methods=['DELETE'])
def delete_batch(batch_id):
@routes.delete('/batches/{batch_id}/delete')
async def delete_batch(request):
batch_id = int(request.match_info['batch_id'])
batch = batch_id_batch.get(batch_id)
if not batch:
abort(404)
batch.delete()
return jsonify({})


@app.route('/batches/<int:batch_id>/close', methods=['POST'])
def close_batch(batch_id):
@routes.post('/batches/{batch_id}/close')
async def close_batch(request):
batch_id = int(request.match_info['batch_id'])
batch = batch_id_batch.get(batch_id)
if not batch:
abort(404)
Expand All @@ -691,13 +714,14 @@ def update_job_with_pod(job, pod):
job.mark_unscheduled()


@app.route('/pod_changed', methods=['POST'])
def pod_changed():
parameters = request.json
@routes.post('/pod_changed')
async def pod_changed(request):
parameters = await request.json()

pod_name = parameters['pod_name']

job = pod_name_job.get(pod_name)

if job and not job.is_complete():
try:
pod = v1.read_namespaced_pod(
Expand All @@ -712,11 +736,11 @@ def pod_changed():

update_job_with_pod(job, pod)

return '', 204
return web.Response(status=204)


@app.route('/refresh_k8s_state', methods=['POST'])
def refresh_k8s_state():
@routes.post('/refresh_k8s_state')
async def refresh_k8s_state(request): # pylint: disable=W0613
log.info('started k8s state refresh')

pods = v1.list_namespaced_pod(
Expand All @@ -739,13 +763,14 @@ def refresh_k8s_state():

log.info('k8s state refresh complete')

return '', 204
return web.Response(status=204)


@app.route('/recent', methods=['GET'])
def recent():
@routes.get('/recent')
@aiohttp_jinja2.template('recent.html')
async def recent(request): # pylint: disable=W0613
recent_events = get_recent_events()
return render_template('recent.html', recent=list(reversed(recent_events)))
return {'recent': list(reversed(recent_events))}


def run_forever(target, *args, **kwargs):
Expand Down Expand Up @@ -773,8 +798,9 @@ def run_once(target, *args, **kwargs):
log.error(f'run_forever: {target.__name__} caught_exception: ', exc_info=sys.exc_info())


def flask_event_loop(port):
app.run(threaded=False, host='0.0.0.0', port=port)
def aiohttp_event_loop(port):
app.add_routes(routes)
web.run_app(app, host='0.0.0.0', port=port)


def kube_event_loop(port):
Expand Down Expand Up @@ -827,6 +853,6 @@ def serve(port=5000):
# see: https://stackoverflow.com/questions/31264826/start-a-flask-application-in-separate-thread
# flask_thread = threading.Thread(target=flask_event_loop)
# flask_thread.start()
run_forever(flask_event_loop, port)
run_forever(aiohttp_event_loop, port)

kube_thread.join()
8 changes: 7 additions & 1 deletion batch/environment.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: hail-batch
dependencies:
- python=3.6
- python=3.7
- werkzeug
- flask
- pip
Expand All @@ -11,3 +11,9 @@ dependencies:
- pylint
- pytest
- requests
- aiohttp
- aiodns
- cchardet
- aiohttp_jinja2
- jinja2
- uvloop>=0.12
7 changes: 7 additions & 0 deletions batch/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,12 @@
'cerberus',
'kubernetes',
'flask',
'requests',
'aiohttp',
'aiodns',
'cchardet',
'aiohttp_jinja2',
'jinja2',
'uvloop>=0.12'
],
)
2 changes: 1 addition & 1 deletion hail-ci-build-image
Original file line number Diff line number Diff line change
@@ -1 +1 @@
gcr.io/hail-vdc/hail-pr-builder:4add3ab6bbb110e33fc7ef3f080015ce688e1a4db530032e8ac89e0d21ccd3f5
gcr.io/hail-vdc/hail-pr-builder:ce282874ade4260da2beb442c01dd1d2fb3302b6af80929da2a1e793e0fa5e58
4 changes: 2 additions & 2 deletions pipeline/pipeline/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,5 +272,5 @@ def write_pipeline_outputs(r, dest):

if failed_jobs or status['jobs']['Complete'] != n_jobs_submitted:
raise Exception(fail_msg)
else:
print("Pipeline completed successfully!")

print("Pipeline completed successfully!")
4 changes: 2 additions & 2 deletions pipeline/pipeline/pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
# many local variables, W0603 using the global statement, R0902 too many
# instance attributes, R1705 unnecessary "else" after "return", W0511 fixme,
# R0903 Too few public methods, R0401 Cyclic import, R0801 Similar lines,
# R0912 Too many branches
# R0912 Too many branches, R1720 no-else-raise

disable=C0111,W1203,W1202,C0111,R0913,W0622,W0212,W0621,R0914,W0603,R0902,R1705,W0511,R0903,R0401,R0801,R0912
disable=C0111,W1203,W1202,C0111,R0913,W0622,W0212,W0621,R0914,W0603,R0902,R1705,W0511,R0903,R0401,R0801,R0912,R1720

[FORMAT]
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
Expand Down

0 comments on commit e0391af

Please sign in to comment.