diff --git a/jina/drivers/control.py b/jina/drivers/control.py index 59eee9cde2dbf..6d4e0d0c60cfd 100644 --- a/jina/drivers/control.py +++ b/jina/drivers/control.py @@ -2,7 +2,7 @@ from . import BaseDriver from ..excepts import UnknownControlCommand, RequestLoopEnd, NoExplicitMessage -from ..proto import jina_pb2 +from ..proto import jina_pb2, is_data_request class ControlReqDriver(BaseDriver): @@ -63,7 +63,7 @@ def __init__(self, *args, **kwargs): self.is_pollin_paused = False def __call__(self, *args, **kwargs): - if not isinstance(self.req, jina_pb2.Request.ControlRequest): + if is_data_request(self.req): self.logger.debug(self.idle_dealer_ids) if self.idle_dealer_ids: dealer_id = self.idle_dealer_ids.pop() diff --git a/jina/logging/base.py b/jina/logging/base.py index 06146a6b0dd93..b1debe6979755 100644 --- a/jina/logging/base.py +++ b/jina/logging/base.py @@ -133,7 +133,7 @@ def success(self, msg: str, **kwargs): sys.stdout.write('W:%s:%s' % (self.context, self._planify(msg))) -def get_logger(context: str, context_len: int = 10, +def get_logger(context: str, context_len: int = 15, log_profile: bool = False, log_sse: bool = False, log_remote: bool = False, diff --git a/jina/main/parser.py b/jina/main/parser.py index 8a451d2b51a86..e45e97432b299 100644 --- a/jina/main/parser.py +++ b/jina/main/parser.py @@ -100,9 +100,10 @@ def set_pea_parser(parser=None): help='the identity of the sockets, default a random string') gp0.add_argument('--yaml_path', type=str, default='BaseExecutor', help='the yaml config of the executor, it could be ' - 'a) a YAML file path, ' - 'b) a supported executor\'s class name, ' - 'c) the content of YAML config (must starts with "!")') # pod(no use) -> pea + '> a YAML file path, ' + '> a supported executor\'s class name, ' + '> one of "_clear", "_route", "_forward", "_logroute", "_merge" ' + '> the content of YAML config (must starts with "!")') # pod(no use) -> pea gp1 = add_arg_group(parser, 'pea container arguments') gp1.add_argument('--image', type=str, diff --git a/jina/peapods/pea.py b/jina/peapods/pea.py index da242f5beb071..e5f25a96fcfd0 100644 --- a/jina/peapods/pea.py +++ b/jina/peapods/pea.py @@ -17,7 +17,7 @@ from ..executors import BaseExecutor from ..logging import get_logger from ..logging.profile import used_memory, TimeDict -from ..proto import jina_pb2 +from ..proto import jina_pb2, is_data_request __all__ = ['PeaMeta', 'BasePea'] @@ -112,9 +112,7 @@ def handle(self, msg: 'jina_pb2.Message') -> 'BasePea': self._message = msg req_type = type(self._request) - if self.args.num_part > 1 and (req_type != jina_pb2.Request.ControlRequest - or (req_type == jina_pb2.Request.ControlRequest - and self._request.command == jina_pb2.Request.ControlRequest.DRYRUN)): + if self.args.num_part > 1 and is_data_request(self._request): # do gathering, not for control request, unless it is dryrun req_id = msg.envelope.request_id self._pending_msgs[req_id].append(msg) diff --git a/jina/peapods/pod.py b/jina/peapods/pod.py index 778004560b315..0ce3922270a2a 100644 --- a/jina/peapods/pod.py +++ b/jina/peapods/pod.py @@ -403,7 +403,7 @@ def _copy_to_head_args(args, is_push: bool, as_router: bool = True): _head_args.yaml_path = '_route' else: raise NotImplementedError - _head_args.name = '_head' + _head_args.name = (args.name or '') + '-head' if is_push: if args.scheduling == SchedulerType.ROUND_ROBIN: _head_args.socket_out = SocketType.PUSH_BIND @@ -423,7 +423,7 @@ def _copy_to_tail_args(args, num_part: int, as_router: bool = True): _tail_args.socket_in = SocketType.PULL_BIND if as_router: _tail_args.yaml_path = '_forward' - _tail_args.name = '_tail' + _tail_args.name = (args.name or '') + '-tail' _tail_args.num_part = num_part return _tail_args diff --git a/jina/peapods/zmq.py b/jina/peapods/zmq.py index e766a7c28e1f9..56854c26ec816 100644 --- a/jina/peapods/zmq.py +++ b/jina/peapods/zmq.py @@ -14,7 +14,7 @@ from ..helper import colored, get_random_identity from ..logging import default_logger from ..logging.base import get_logger -from ..proto import jina_pb2 +from ..proto import jina_pb2, is_data_request if False: # fix type-hint complain for sphinx and flake @@ -170,14 +170,10 @@ def send_message(self, msg: 'jina_pb2.Message'): _req = getattr(msg.request, msg.request.WhichOneof('body')) _req_type = type(_req) - if _req_type == jina_pb2.Request.ControlRequest: - if _req.command == jina_pb2.Request.ControlRequest.DRYRUN: - # pass this control message to the next - o_sock = self.out_sock - else: - o_sock = self.ctrl_sock - else: + if is_data_request(_req): o_sock = self.out_sock + else: + o_sock = self.ctrl_sock self.bytes_sent += send_message(o_sock, msg, **self.send_recv_kwargs) self.msg_sent += 1 diff --git a/jina/proto/__init__.py b/jina/proto/__init__.py index 88ee48ad429fb..452c6f2a5211e 100644 --- a/jina/proto/__init__.py +++ b/jina/proto/__init__.py @@ -1,3 +1,15 @@ """ The :mod:`jina.proto` defines the protobuf used in jina. It is the core message protocol used in communicating between :class:`jina.peapods.base.BasePod`. It also defines the interface of a gRPC service. """ + +from .jina_pb2 import Request + + +def is_data_request(req: 'Request') -> bool: + """check if the request is data request + + DRY_RUN is a ControlRequest but considered as data request + """ + req_type = type(req) + return req_type != Request.ControlRequest or (req_type == Request.ControlRequest + and req.command == Request.ControlRequest.DRYRUN)