Skip to content

Commit

Permalink
feat(peapod): add logs grouping by pod name
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxiao committed May 5, 2020
1 parent 97ab83a commit 153e2aa
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 21 deletions.
6 changes: 6 additions & 0 deletions docs/chapters/envs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,9 @@ Here is the list of environment variables that ``jina`` respects during runtime.
If set, then all pretrained model-related tests will be conducted in the unit test.

:default: unset

.. confval:: JINA_POD_NAME

The Pod name set when a Pod started when ``--log-with-pod-name`` is on, this should not given manually by users.

:default: unset
4 changes: 3 additions & 1 deletion jina/drivers/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ def __call__(self, *args, **kwargs):
self.is_pollin_paused = True
else:
raise RuntimeError('if this router connects more than one dealer, '
'then this error should never be raised')
'then this error should never be raised. often when it '
'is raised, some Pods must fail to start, so please go '
'up and check the first error message in the log')
elif self.req.command == jina_pb2.Request.ControlRequest.IDLE:
self.idle_dealer_ids.add(self.envelope.receiver_id)
self.logger.debug(f'{self.envelope.receiver_id} is idle')
Expand Down
10 changes: 10 additions & 0 deletions jina/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,13 @@ class FlowBuildLevel(BetterEnum):
"""
EMPTY = 0 #: Nothing is built
GRAPH = 1 #: The underlying graph is built, you may visualize the flow


class PeaRoleType(BetterEnum):
""" The enum of a Pea role
"""
REPLICA = 0
HEAD = 1
TAIL = 2
SHARD = 3
8 changes: 5 additions & 3 deletions jina/logging/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class JsonFormatter(Formatter):
"""Format the log message as a JSON object so that it can be later used/parsed in browser with javascript. """

KEYS = {'created', 'filename', 'funcName', 'levelname', 'lineno', 'msg',
'module', 'name', 'pathname', 'process', 'thread'} #: keys to extract from the log
'module', 'name', 'pathname', 'process', 'thread', 'processName',
'threadName'} #: keys to extract from the log

def format(self, record):
cr = copy(record)
Expand Down Expand Up @@ -161,11 +162,12 @@ def get_logger(context: str, context_len: int = 15,
from .. import __uptime__
from .queue import __sse_queue__, __profile_queue__, __log_queue__
if not fmt_str:
title = os.environ.get('JINA_POD_NAME', context)
if 'JINA_LOG_LONG' in os.environ:
fmt_str = f'{context[:context_len]:>{context_len}}@%(process)2d' \
fmt_str = f'{title[:context_len]:>{context_len}}@%(process)2d' \
f'[%(levelname).1s][%(filename).3s:%(funcName).3s:%(lineno)3d]:%(message)s'
else:
fmt_str = f'{context[:context_len]:>{context_len}}@%(process)2d' \
fmt_str = f'{title[:context_len]:>{context_len}}@%(process)2d' \
f'[%(levelname).1s]:%(message)s'

timed_fmt_str = f'%(asctime)s:' + fmt_str
Expand Down
12 changes: 6 additions & 6 deletions jina/main/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ def _gaa(parser):
'--host-out', '--socket-in', '--socket-out', '--port-ctrl', '--ctrl-with-ipc', '--timeout',
'--timeout-ctrl', '--timeout-ready', '--dump-interval', '--exit-no-dump', '--read-only',
'--separated-workspace', '--replica-id', '--check-version', '--array-in-pb', '--compress-hwm',
'--compress-lwm', '--num-part', '--memory-hwm', '--runtime', '--max-idle-time', '--log-sse',
'--log-remote', '--log-profile', '--override-exec-log', '--host', '--port-grpc', '--max-message-size',
'--compress-lwm', '--num-part', '--role', '--memory-hwm', '--runtime', '--max-idle-time', '--log-sse',
'--log-remote', '--log-profile', '--log-with-own-name', '--host', '--port-grpc', '--max-message-size',
'--proxy'],
'hello-world': ['--help', '--version', '--version-full', '--workdir', '--logserver', '--shards', '--replicas',
'--index-yaml-path', '--index-data-url', '--index-batch-size', '--query-yaml-path',
Expand All @@ -61,17 +61,17 @@ def _gaa(parser):
'--host-out', '--socket-in', '--socket-out', '--port-ctrl', '--ctrl-with-ipc', '--timeout',
'--timeout-ctrl', '--timeout-ready', '--dump-interval', '--exit-no-dump', '--read-only',
'--separated-workspace', '--replica-id', '--check-version', '--array-in-pb', '--compress-hwm',
'--compress-lwm', '--num-part', '--memory-hwm', '--runtime', '--max-idle-time', '--log-sse',
'--log-remote', '--log-profile', '--override-exec-log', '--host', '--port-grpc', '--max-message-size',
'--compress-lwm', '--num-part', '--role', '--memory-hwm', '--runtime', '--max-idle-time', '--log-sse',
'--log-remote', '--log-profile', '--log-with-own-name', '--host', '--port-grpc', '--max-message-size',
'--proxy', '--replicas', '--polling', '--scheduling', '--reducing-yaml-path', '--shutdown-idle'],
'check': ['--help', '--version', '--version-full', '--summary-exec', '--summary-driver'],
'gateway': ['--help', '--version', '--version-full', '--name', '--identity', '--yaml-path', '--py-modules',
'--image', '--entrypoint', '--pull-latest', '--volumes', '--port-in', '--port-out', '--host-in',
'--host-out', '--socket-in', '--socket-out', '--port-ctrl', '--ctrl-with-ipc', '--timeout',
'--timeout-ctrl', '--timeout-ready', '--dump-interval', '--exit-no-dump', '--read-only',
'--separated-workspace', '--replica-id', '--check-version', '--array-in-pb', '--compress-hwm',
'--compress-lwm', '--num-part', '--memory-hwm', '--runtime', '--max-idle-time', '--log-sse',
'--log-remote', '--log-profile', '--override-exec-log', '--host', '--port-grpc',
'--compress-lwm', '--num-part', '--role', '--memory-hwm', '--runtime', '--max-idle-time',
'--log-sse', '--log-remote', '--log-profile', '--log-with-own-name', '--host', '--port-grpc',
'--max-message-size', '--proxy', '--prefetch', '--prefetch-on-recv', '--allow-spawn'],
'ping': ['--help', '--version', '--version-full', '--timeout', '--retries', '--print-response'],
'client': ['--help', '--version', '--version-full', '--host', '--port-grpc', '--max-message-size', '--proxy',
Expand Down
12 changes: 8 additions & 4 deletions jina/main/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def set_flow_parser(parser=None):


def set_pea_parser(parser=None):
from ..enums import SocketType
from ..enums import SocketType, PeaRoleType
from ..helper import random_port, get_random_identity
from .. import __default_host__
import os
Expand Down Expand Up @@ -222,14 +222,16 @@ def set_pea_parser(parser=None):
'compression, and will be sent. Otherwise, it will send the original message without compression')
gp5.add_argument('--num-part', type=int, default=1,
help='wait until the number of parts of message are all received')
gp5.add_argument('--role', type=PeaRoleType.from_string, choices=list(PeaRoleType),
help='the role of this pea in a pod')

gp6 = add_arg_group(parser, 'pea EXPERIMENTAL arguments')
gp6.add_argument('--memory-hwm', type=int, default=-1,
help='memory high watermark of this pod in Gigabytes, pod will restart when this is reached. '
'-1 means no restriction')
gp6.add_argument('--runtime', type=str, choices=['thread', 'process'], default='process',
help='the parallel runtime of the pod')
gp5.add_argument('--max-idle-time', type=int, default=60,
gp6.add_argument('--max-idle-time', type=int, default=60,
help='label this pea as inactive when it does not '
'process any request after certain time (in second)')

Expand All @@ -240,8 +242,10 @@ def set_pea_parser(parser=None):
help='turn on remote logging')
gp7.add_argument('--log-profile', action='store_true', default=False,
help='turn on the profiling logger')
gp7.add_argument('--override-exec-log', action='store_true', default=False,
help='turn on to allow the override of the executor logger by the pea logger')
gp7.add_argument('--log-with-own-name', action='store_true', default=False,
help='turn on to let each logger outputs in its own name (i.e. parent class name as the context), '
'by default it is off so all logs from the same pod will have the same prefix. '
'turn on to help debugging, turn off to have more clear logs and better grouping in dashboard')
_set_grpc_parser(parser)
return parser

Expand Down
15 changes: 11 additions & 4 deletions jina/peapods/pea.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import argparse
import multiprocessing
import os
import threading
import time
from collections import defaultdict
Expand All @@ -14,6 +15,7 @@
from .zmq import send_ctrl_message, Zmqlet
from .. import __ready_msg__, __stop_msg__
from ..drivers.helper import routes2str, add_route
from ..enums import PeaRoleType
from ..excepts import NoExplicitMessage, ExecutorFailToLoad, MemoryOverHighWatermark, UnknownControlCommand, \
RequestLoopEnd, \
DriverNotInstalled, NoDriverForRequest
Expand Down Expand Up @@ -107,7 +109,7 @@ def __init__(self, args: Union['argparse.Namespace', Dict]):
"""
super().__init__()
self.args = args
self.name = self.__class__.__name__
self.name = self.__class__.__name__ #: this is the process name
self.daemon = True

self.is_ready = _get_event(self)
Expand All @@ -133,9 +135,16 @@ def __init__(self, args: Union['argparse.Namespace', Dict]):
if isinstance(args, argparse.Namespace):
if args.name:
self.name = args.name
if args.replica_id > 0:
if args.role == PeaRoleType.HEAD:
self.name = '%s-head' % self.name
elif args.role == PeaRoleType.TAIL:
self.name = '%s-tail' % self.name
elif args.role == PeaRoleType.REPLICA:
self.name = '%s-%d' % (self.name, args.replica_id)
self.ctrl_addr, self.ctrl_with_ipc = Zmqlet.get_ctrl_address(args)
if not args.log_with_own_name and args.name:
# everything in this Pea (process) will use the same name for display the log
os.environ['JINA_POD_NAME'] = args.name
self.logger = get_logger(self.name, **vars(args))
else:
self.logger = get_logger(self.name)
Expand Down Expand Up @@ -223,8 +232,6 @@ def load_executor(self):
try:
self.executor = BaseExecutor.load_config(self.args.yaml_path,
self.args.separated_workspace, self.args.replica_id)
if self.args.override_exec_log:
self.executor.logger = self.logger
self.executor.attach(pea=self)
# self.logger = get_logger('%s(%s)' % (self.name, self.executor.name), **vars(self.args))
except FileNotFoundError:
Expand Down
9 changes: 6 additions & 3 deletions jina/peapods/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def start(self):
start_rep_id = 0
for idx, _args in enumerate(self.peas_args['peas'], start=start_rep_id):
_args.replica_id = idx
_args.role = PeaRoleType.REPLICA
p = Pea(_args, allow_remote=False)
self.peas.append(p)
self.stack.enter_context(p)
Expand Down Expand Up @@ -421,7 +422,8 @@ def _copy_to_head_args(args, is_push: bool, as_router: bool = True):
_head_args.yaml_path = '_forward'

if as_router:
_head_args.name = (args.name or '') + '-head'
_head_args.name = args.name or ''
_head_args.role = PeaRoleType.HEAD

# head and tail never run in docker, reset their image to None
_head_args.image = None
Expand All @@ -437,7 +439,8 @@ def _copy_to_tail_args(args, num_part: int, as_router: bool = True):
_tail_args.socket_in = SocketType.PULL_BIND
if as_router:
_tail_args.yaml_path = args.reducing_yaml_path
_tail_args.name = (args.name or '') + '-tail'
_tail_args.name = args.name or ''
_tail_args.role = PeaRoleType.TAIL
_tail_args.num_part = num_part

# head and tail never run in docker, reset their image to None
Expand All @@ -450,7 +453,7 @@ def _fill_in_host(bind_args, connect_args):

bind_local = (bind_args.host == '0.0.0.0')
bind_docker = (bind_args.image is not None and bind_args.image)
conn_tail = (connect_args.name is not None and connect_args.name.endswith('-tail'))
conn_tail = (connect_args.name is not None and connect_args.role == PeaRoleType.TAIL)
conn_local = (connect_args.host == '0.0.0.0')
conn_docker = (connect_args.image is not None and connect_args.image)
bind_conn_same_remote = not bind_local and not conn_local and (bind_args.host == connect_args.host)
Expand Down

0 comments on commit 153e2aa

Please sign in to comment.