Skip to content

Commit

Permalink
feat(pod): add load balancing as one scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Han Xiao <[email protected]>
  • Loading branch information
hanxiao committed Apr 5, 2020
1 parent 964aa7d commit db1234c
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 4 deletions.
2 changes: 1 addition & 1 deletion jina/clients/python/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ def __exit__(self, exc_type, exc_val, exc_tb):
speed = self.proc_doc / elapsed
else:
speed = self.num_bars / elapsed
sys.stdout.write('\t%s\n' % colored(f'done in {elapsed:3.1f}s @ {speed:3.1f}/s', 'green'))
sys.stdout.write('\t%s\n' % colored(f'done in {elapsed:3.1f}s 🐎 {speed:3.1f}/s', 'green'))
2 changes: 1 addition & 1 deletion jina/main/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def set_pod_parser(parser=None):
gp4.add_argument('--replica_type', type=ReplicaType.from_string, choices=list(ReplicaType),
default=ReplicaType.ONEOF,
help='replica type of the concurrent peas')
gp4.add_argument('--scheduling', type=ReplicaType.from_string, choices=list(SchedulerType),
gp4.add_argument('--scheduling', type=SchedulerType.from_string, choices=list(SchedulerType),
default=SchedulerType.LOAD_BALANCE,
help='the strategy of scheduling workload among peas')
gp4.add_argument('--shutdown_idle', action='store_true', default=False,
Expand Down
4 changes: 2 additions & 2 deletions jina/resources/executors.forward.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ requests:
on:
[SearchRequest, TrainRequest, IndexRequest]:
- !ForwardDriver {}
ControlRequest:
- !ControlReqDriver {}
ControlRequest:
- !ControlReqDriver {}
58 changes: 58 additions & 0 deletions tests/test_loadbalance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import os
import time

import numpy as np

from jina.drivers.helper import array2blob
from jina.enums import FlowOptimizeLevel, SchedulerType
from jina.executors.crafters import BaseDocCrafter
from jina.flow import Flow
from jina.proto import jina_pb2
from tests import JinaTestCase


def random_docs(num_docs, chunks_per_doc=5, embed_dim=10):
c_id = 0
for j in range(num_docs):
d = jina_pb2.Document()
for k in range(chunks_per_doc):
c = d.chunks.add()
c.embedding.CopyFrom(array2blob(np.random.random([embed_dim])))
c.chunk_id = c_id
c.doc_id = j
c_id += 1
yield d


class SlowWorker(BaseDocCrafter):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# half of worker is slow
self.is_slow = os.getpid() % 2 != 0
self.logger.warning('im a slow worker')

def craft(self, doc_id, *args, **kwargs):
if self.is_slow:
self.logger.warning('slowly doing')
time.sleep(1)
return {'doc_id': doc_id}


class MyTestCase(JinaTestCase):
def test_lb(self):
f = Flow(runtime='process',
optimize_level=FlowOptimizeLevel.IGNORE_GATEWAY).add(
name='sw',
yaml_path='SlowWorker',
replicas=10).build()
with f:
f.index(raw_bytes=random_docs(100), in_proto=True, batch_size=10)

def test_roundrobin(self):
f = Flow(runtime='process', optimize_level=FlowOptimizeLevel.IGNORE_GATEWAY).add(
name='sw',
yaml_path='SlowWorker',
replicas=10, scheduling=SchedulerType.ROUND_ROBIN).build()
with f:
f.index(raw_bytes=random_docs(100), in_proto=True, batch_size=10)

0 comments on commit db1234c

Please sign in to comment.