Skip to content

Commit

Permalink
feat: add load balancing to pods
Browse files Browse the repository at this point in the history
Signed-off-by: Han Xiao <[email protected]>
  • Loading branch information
hanxiao committed Apr 5, 2020
1 parent b8e186e commit 84ff5b8
Show file tree
Hide file tree
Showing 19 changed files with 307 additions and 150 deletions.
8 changes: 0 additions & 8 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->


- [Release Cycle](#release-cycle)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

# Release Cycle

We follow the [semantic versioning](https://semver.org/). Say current master version is at `x.y.z`,
Expand Down
1 change: 1 addition & 0 deletions docs/chapters/RELEASE.md
16 changes: 8 additions & 8 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
sphinx
sphinx-argparse
sphinxcontrib-apidoc
sphinx-autodoc-typehints
sphinx_rtd_theme
recommonmark
sphinx_markdown_tables
sphinx_copybutton
sphinx==2.4.4
sphinx-argparse==0.2.5
sphinxcontrib-apidoc==0.3.0
sphinx-autodoc-typehints==1.10.3
sphinx_rtd_theme==0.4.3
recommonmark==0.6.0
sphinx_markdown_tables==0.0.12
sphinx_copybutton==0.2.10
2 changes: 1 addition & 1 deletion jina/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# do not change this line manually
# this is managed by proto/build-proto.sh and updated on every execution
__proto_version__ = '0.0.15'
__proto_version__ = '0.0.18'

import platform
import sys
Expand Down
5 changes: 4 additions & 1 deletion jina/clients/python/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,8 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
elapsed = time.perf_counter() - self.start_time
speed = self.num_bars / elapsed
if self.proc_doc > 0:
speed = self.proc_doc / elapsed
else:
speed = self.num_bars / elapsed
sys.stdout.write('\t%s\n' % colored(f'done in {elapsed:3.1f}s @ {speed:3.1f}/s', 'green'))
7 changes: 6 additions & 1 deletion jina/drivers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BaseDriver(metaclass=DriverType):

def __init__(self, *args, **kwargs):
self.attached = False #: represent if this driver is attached to a :class:`jina.peapods.pea.BasePea` (& :class:`jina.executors.BaseExecutor`)
self.pea = None
self.pea = None # type: 'BasePea'

def attach(self, pea: 'BasePea', *args, **kwargs):
"""Attach this driver to a :class:`jina.peapods.pea.BasePea`
Expand Down Expand Up @@ -108,6 +108,11 @@ def msg(self) -> 'jina_pb2.Message':
"""Get the current request, shortcut to ``self.pea.message``"""
return self.pea.message

@property
def envelope(self) -> 'jina_pb2.Envelope':
"""Get the current request, shortcut to ``self.pea.message``"""
return self.pea.message.envelope

@property
def prev_msgs(self) -> List['jina_pb2.Message']:
"""Get all previous messages that has the same ``request_id``, shortcut to ``self.pea.prev_messages``
Expand Down
52 changes: 47 additions & 5 deletions jina/drivers/control.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time

from . import BaseDriver
from ..excepts import UnknownControlCommand, RequestLoopEnd
from ..excepts import UnknownControlCommand, RequestLoopEnd, NoExplicitMessage
from ..proto import jina_pb2


Expand All @@ -10,14 +10,14 @@ class ControlReqDriver(BaseDriver):

def __call__(self, *args, **kwargs):
if self.req.command == jina_pb2.Request.ControlRequest.TERMINATE:
self.msg.envelope.status = jina_pb2.Envelope.SUCCESS
self.envelope.status = jina_pb2.Envelope.SUCCESS
raise RequestLoopEnd
elif self.req.command == jina_pb2.Request.ControlRequest.STATUS:
self.msg.envelope.status = jina_pb2.Envelope.READY
self.envelope.status = jina_pb2.Envelope.READY
for k, v in vars(self.pea.args).items():
self.req.args[k] = str(v)
elif self.req.command == jina_pb2.Request.ControlRequest.DRYRUN:
self.msg.envelope.status = jina_pb2.Envelope.READY
self.envelope.status = jina_pb2.Envelope.READY
else:
raise UnknownControlCommand('don\'t know how to handle %s' % self.req)

Expand All @@ -37,7 +37,49 @@ def __call__(self, *args, **kwargs):


class ForwardDriver(BaseDriver):
"""Route the message to next pod"""
"""Forward the message to next pod"""

def __call__(self, *args, **kwargs):
pass


class RouteDriver(ControlReqDriver):
"""A simple load balancer forward message to the next available pea
- The dealer never receives a control request from the router,
everytime it finishes a job and send via out_sock, it returns the envelope with control
request idle back to the router. The dealer also sends control request idle to the router
when it first starts.
- The router receives request from both dealer and upstream pusher.
if it is a upstream request, use LB to schedule the receiver, mark it in the envelope
if it is a control request in
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.idle_dealer_ids = set()
self.is_pollin_paused = False

def __call__(self, *args, **kwargs):
if not isinstance(self.req, jina_pb2.Request.ControlRequest):
self.logger.info(self.idle_dealer_ids)
if self.idle_dealer_ids:
dealer_id = self.idle_dealer_ids.pop()
self.envelope.receiver_id = dealer_id
if not self.idle_dealer_ids:
self.pea.zmqlet.pause_pollin()
self.is_pollin_paused = True
else:
raise RuntimeError('if this router connects more than one dealer, '
'then this error should never be raised')
elif self.req.command == jina_pb2.Request.ControlRequest.IDLE:
self.idle_dealer_ids.add(self.envelope.receiver_id)
self.logger.info(f'{self.envelope.receiver_id} is idle')
if self.is_pollin_paused:
self.pea.zmqlet.resume_pollin()
self.is_pollin_paused = False
raise NoExplicitMessage
else:
super().__call__(*args, **kwargs)
13 changes: 10 additions & 3 deletions jina/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ def from_string(cls, s: str):
raise ValueError('%s is not a valid enum for %s' % (s.upper(), cls))


class SchedulerType(BetterEnum):
LOAD_BALANCE = 0 #: balance the workload between Peas, faster peas get more work
ROUND_ROBIN = 1 #: workload are scheduled round-robin manner to the peas, assuming all peas have uniform processing speed.


class ReplicaType(BetterEnum):
"""The enum for representing the parallel type of peas in a pod
.. note::
``PUSH_BLOCK`` does not exist as push message has different request ids, they can not be blocked
"""
PUSH_NONBLOCK = 1 #: push without blocking
PUB_BLOCK = 2 #: publish message with blocking (in this case, blocking means collecting all published messages until next)
PUB_NONBLOCK = 3 #: publish message but no blocking
ONEOF = 1 #: one of the replica will receive the message
ALL_SYNC = 2 #: all replica will receive the message, blocked until all done with the message
ALL = 3 #: all replica will receive the message

@property
def is_push(self) -> bool:
Expand Down Expand Up @@ -74,6 +79,8 @@ class SocketType(BetterEnum):
PUB_CONNECT = 7
PAIR_BIND = 8
PAIR_CONNECT = 9
ROUTER_BIND = 10
DEALER_CONNECT = 11

@property
def is_bind(self) -> bool:
Expand Down
6 changes: 5 additions & 1 deletion jina/excepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class MismatchedVersion(Exception):
"""When the jina version info of the incoming message does not match the local jina version"""


class WaitPendingMessage(Exception):
class NoExplicitMessage(Exception):
"""Waiting until all partial messages are received"""


Expand Down Expand Up @@ -87,3 +87,7 @@ class GRPCGatewayError(Exception):

class GRPCServerError(Exception):
"""Can not connect to the grpc gateway"""


class NoIdleDealer(Exception):
"""All dealers are exhausted no more idle dealer"""
12 changes: 5 additions & 7 deletions jina/helper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import os
import random
import re
import string
import sys
import threading
import time
from itertools import islice
from types import SimpleNamespace
Expand All @@ -17,7 +15,7 @@
__all__ = ['batch_iterator', 'yaml',
'load_contrib_module',
'parse_arg',
'PathImporter', 'random_port', 'random_identity', 'expand_env_var',
'PathImporter', 'random_port', 'get_random_identity', 'expand_env_var',
'colored', 'kwargs2list', 'valid_yaml_path']


Expand Down Expand Up @@ -250,8 +248,8 @@ def register_port(port: int, stack_id: int = JINA_GLOBAL.stack.id):
yaml.dump(_all, fp)


def random_identity() -> str:
return '%s-%s-%s' % (os.getpid(), threading.get_ident(), ''.join(random.choices(string.ascii_lowercase, k=5)))
def get_random_identity() -> str:
return '%010x' % random.getrandbits(40)


yaml = _get_yaml()
Expand Down Expand Up @@ -406,15 +404,15 @@ def valid_yaml_path(path: str, to_stream: bool = False):
return open(path, encoding='utf8')
else:
return path
elif path.lower() in {'route', 'merge', 'clear', 'logroute'}:
elif path.lower() in {'route', 'merge', 'clear', 'logroute', 'forward'}:
from pkg_resources import resource_filename
return resource_filename('jina', '/'.join(('resources', 'executors.%s.yml' % path)))
elif path.startswith('!'):
# possible YAML content
return io.StringIO(path)
elif path.isidentifier():
# possible class name
return io.StringIO('!%s' % path)
return io.StringIO(f'!{path}')
else:
raise FileNotFoundError('%s can not be resolved, it should be a readable stream,'
' or a valid file path, or a supported class name.' % path)
21 changes: 13 additions & 8 deletions jina/main/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,22 @@ def set_flow_parser(parser=None):

def set_pea_parser(parser=None):
from ..enums import SocketType
from ..helper import random_port, random_identity
from ..helper import random_port, get_random_identity
from .. import __default_host__
import os
if not parser:
parser = set_base_parser()

gp0 = add_arg_group(parser, 'pea basic arguments')
gp0.add_argument('--name', type=str,
help='the name of this pod, used to identify the pod and its logs.')
gp0.add_argument('--identity', type=str, default=random_identity(),
help='the identity of the pod, default a random string')
help='the name of this pea, used to identify the pod and its logs.')
gp0.add_argument('--identity', type=str, default=get_random_identity(),
help='the identity of the sockets, default a random string')
gp0.add_argument('--yaml_path', type=str, default='BaseExecutor',
help='the yaml config of the executor, it should be a readable stream,'
' or a valid file path, or a supported class name.') # pod(no use) -> pea
help='the yaml config of the executor, it could be '
'a) a YAML file path, '
'b) a supported executor\'s class name, '
'c) the content of YAML config (must starts with "!")') # pod(no use) -> pea

gp1 = add_arg_group(parser, 'pea container arguments')
gp1.add_argument('--image', type=str,
Expand Down Expand Up @@ -191,7 +193,7 @@ def set_pea_parser(parser=None):


def set_pod_parser(parser=None):
from ..enums import ReplicaType
from ..enums import ReplicaType, SchedulerType
if not parser:
parser = set_base_parser()
set_pea_parser(parser)
Expand All @@ -202,8 +204,11 @@ def set_pod_parser(parser=None):
'`port_in` and `port_out` will be set to random, '
'and routers will be added automatically when necessary')
gp4.add_argument('--replica_type', type=ReplicaType.from_string, choices=list(ReplicaType),
default=ReplicaType.PUSH_NONBLOCK,
default=ReplicaType.ONEOF,
help='replica type of the concurrent peas')
gp4.add_argument('--scheduling', type=ReplicaType.from_string, choices=list(SchedulerType),
default=SchedulerType.LOAD_BALANCE,
help='the strategy of scheduling workload among peas')
gp4.add_argument('--shutdown_idle', action='store_true', default=False,
help='shutdown this pod when all peas are idle')
return parser
Expand Down
5 changes: 3 additions & 2 deletions jina/peapods/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .grpc_asyncio import AsyncioExecutor
from .zmq import AsyncZmqlet, add_envelope
from .. import __stop_msg__
from ..excepts import WaitPendingMessage, RequestLoopEnd, NoDriverForRequest, BadRequestType
from ..excepts import NoExplicitMessage, RequestLoopEnd, NoDriverForRequest, BadRequestType
from ..executors import BaseExecutor
from ..logging.base import get_logger
from ..main.parser import set_pea_parser, set_pod_parser
Expand Down Expand Up @@ -84,7 +84,7 @@ def __init__(self, args):
def recv_callback(self, msg):
try:
return self.executor(msg.__class__.__name__)
except WaitPendingMessage:
except NoExplicitMessage:
self.logger.error('gateway should not receive partial message, it can not do reduce')
except RequestLoopEnd:
self.logger.error('event loop end signal should not be raised in the gateway')
Expand Down Expand Up @@ -149,3 +149,4 @@ async def Spawn(self, request, context):
def close(self):
for p in self.peapods:
p.close()

Loading

0 comments on commit 84ff5b8

Please sign in to comment.