Skip to content

Commit

Permalink
fix: handle dry_run like data request
Browse files Browse the repository at this point in the history
Signed-off-by: Han Xiao <[email protected]>
  • Loading branch information
hanxiao committed Apr 6, 2020
1 parent 32b032e commit 6797344
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 20 deletions.
4 changes: 2 additions & 2 deletions jina/drivers/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from . import BaseDriver
from ..excepts import UnknownControlCommand, RequestLoopEnd, NoExplicitMessage
from ..proto import jina_pb2
from ..proto import jina_pb2, is_data_request


class ControlReqDriver(BaseDriver):
Expand Down Expand Up @@ -63,7 +63,7 @@ def __init__(self, *args, **kwargs):
self.is_pollin_paused = False

def __call__(self, *args, **kwargs):
if not isinstance(self.req, jina_pb2.Request.ControlRequest):
if is_data_request(self.req):
self.logger.debug(self.idle_dealer_ids)
if self.idle_dealer_ids:
dealer_id = self.idle_dealer_ids.pop()
Expand Down
2 changes: 1 addition & 1 deletion jina/logging/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def success(self, msg: str, **kwargs):
sys.stdout.write('W:%s:%s' % (self.context, self._planify(msg)))


def get_logger(context: str, context_len: int = 10,
def get_logger(context: str, context_len: int = 15,
log_profile: bool = False,
log_sse: bool = False,
log_remote: bool = False,
Expand Down
7 changes: 4 additions & 3 deletions jina/main/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ def set_pea_parser(parser=None):
help='the identity of the sockets, default a random string')
gp0.add_argument('--yaml_path', type=str, default='BaseExecutor',
help='the yaml config of the executor, it could be '
'a) a YAML file path, '
'b) a supported executor\'s class name, '
'c) the content of YAML config (must starts with "!")') # pod(no use) -> pea
'> a YAML file path, '
'> a supported executor\'s class name, '
'> one of "_clear", "_route", "_forward", "_logroute", "_merge" '
'> the content of YAML config (must starts with "!")') # pod(no use) -> pea

gp1 = add_arg_group(parser, 'pea container arguments')
gp1.add_argument('--image', type=str,
Expand Down
6 changes: 2 additions & 4 deletions jina/peapods/pea.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from ..executors import BaseExecutor
from ..logging import get_logger
from ..logging.profile import used_memory, TimeDict
from ..proto import jina_pb2
from ..proto import jina_pb2, is_data_request

__all__ = ['PeaMeta', 'BasePea']

Expand Down Expand Up @@ -112,9 +112,7 @@ def handle(self, msg: 'jina_pb2.Message') -> 'BasePea':
self._message = msg
req_type = type(self._request)

if self.args.num_part > 1 and (req_type != jina_pb2.Request.ControlRequest
or (req_type == jina_pb2.Request.ControlRequest
and self._request.command == jina_pb2.Request.ControlRequest.DRYRUN)):
if self.args.num_part > 1 and is_data_request(self._request):
# do gathering, not for control request, unless it is dryrun
req_id = msg.envelope.request_id
self._pending_msgs[req_id].append(msg)
Expand Down
4 changes: 2 additions & 2 deletions jina/peapods/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ def _copy_to_head_args(args, is_push: bool, as_router: bool = True):
_head_args.yaml_path = '_route'
else:
raise NotImplementedError
_head_args.name = '_head'
_head_args.name = (args.name or '') + '-head'
if is_push:
if args.scheduling == SchedulerType.ROUND_ROBIN:
_head_args.socket_out = SocketType.PUSH_BIND
Expand All @@ -423,7 +423,7 @@ def _copy_to_tail_args(args, num_part: int, as_router: bool = True):
_tail_args.socket_in = SocketType.PULL_BIND
if as_router:
_tail_args.yaml_path = '_forward'
_tail_args.name = '_tail'
_tail_args.name = (args.name or '') + '-tail'
_tail_args.num_part = num_part
return _tail_args

Expand Down
12 changes: 4 additions & 8 deletions jina/peapods/zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ..helper import colored, get_random_identity
from ..logging import default_logger
from ..logging.base import get_logger
from ..proto import jina_pb2
from ..proto import jina_pb2, is_data_request

if False:
# fix type-hint complain for sphinx and flake
Expand Down Expand Up @@ -170,14 +170,10 @@ def send_message(self, msg: 'jina_pb2.Message'):
_req = getattr(msg.request, msg.request.WhichOneof('body'))
_req_type = type(_req)

if _req_type == jina_pb2.Request.ControlRequest:
if _req.command == jina_pb2.Request.ControlRequest.DRYRUN:
# pass this control message to the next
o_sock = self.out_sock
else:
o_sock = self.ctrl_sock
else:
if is_data_request(_req):
o_sock = self.out_sock
else:
o_sock = self.ctrl_sock

self.bytes_sent += send_message(o_sock, msg, **self.send_recv_kwargs)
self.msg_sent += 1
Expand Down
12 changes: 12 additions & 0 deletions jina/proto/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
"""
The :mod:`jina.proto` defines the protobuf used in jina. It is the core message protocol used in communicating between :class:`jina.peapods.base.BasePod`. It also defines the interface of a gRPC service.
"""

from .jina_pb2 import Request


def is_data_request(req: 'Request') -> bool:
"""check if the request is data request
DRY_RUN is a ControlRequest but considered as data request
"""
req_type = type(req)
return req_type != Request.ControlRequest or (req_type == Request.ControlRequest
and req.command == Request.ControlRequest.DRYRUN)

0 comments on commit 6797344

Please sign in to comment.