Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
feat(flow): flow can not export docker swarm config
Browse files Browse the repository at this point in the history
  • Loading branch information
hanhxiao committed Oct 15, 2019
1 parent 27b5427 commit 8a60c26
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 28 deletions.
113 changes: 85 additions & 28 deletions gnes/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,31 @@ class Flow(TrainableBase):
"""

_service2parser = {
Service.Encoder: set_encoder_parser,
Service.Router: set_router_parser,
Service.Indexer: set_indexer_parser,
Service.Frontend: set_frontend_parser,
Service.Preprocessor: set_preprocessor_parser,
}
_service2builder = {
Service.Encoder: lambda x: ServiceManager(EncoderService, x),
Service.Router: lambda x: ServiceManager(RouterService, x),
Service.Indexer: lambda x: ServiceManager(IndexerService, x),
Service.Preprocessor: lambda x: ServiceManager(PreprocessorService, x),
Service.Frontend: FrontendService,
_service_map = {
Service.Encoder: {
'parser': set_encoder_parser,
'builder': lambda x: ServiceManager(EncoderService, x),
'cmd': 'encode'},
Service.Router: {
'parser': set_router_parser,
'builder': lambda x: ServiceManager(RouterService, x),
'cmd': 'route',
},
Service.Indexer: {
'parser': set_indexer_parser,
'builder': lambda x: ServiceManager(IndexerService, x),
'cmd': 'index'
},
Service.Frontend: {
'parser': set_frontend_parser,
'builder': FrontendService,
'cmd': 'frontend'
},
Service.Preprocessor: {
'parser': set_preprocessor_parser,
'builder': lambda x: ServiceManager(PreprocessorService, x),
'cmd': 'preprocess'
}
}

class BuildLevel(BetterEnum):
Expand All @@ -125,7 +137,7 @@ def __init__(self, with_frontend: bool = True, is_trained: bool = True, *args, *
self.logger = set_logger(self.__class__.__name__)
self._service_nodes = OrderedDict()
self._service_edges = {}
self._service_name_counter = {k: 0 for k in Flow._service2parser.keys()}
self._service_name_counter = {k: 0 for k in Flow._service_map.keys()}
self._service_contexts = []
self._last_changed_service = []
self._common_kwargs = kwargs
Expand All @@ -142,9 +154,49 @@ def __init__(self, with_frontend: bool = True, is_trained: bool = True, *args, *
self.logger.warning('with_frontend is set to False, you need to add_frontend() by yourself')

@_build_level(BuildLevel.GRAPH)
def to_swarm_yaml(self) -> str:
swarm_yml = ''
return swarm_yml
def to_k8s_yaml(self) -> str:
raise NotImplementedError

@_build_level(BuildLevel.GRAPH)
def to_shell_script(self) -> str:
raise NotImplementedError

@_build_level(BuildLevel.GRAPH)
def to_swarm_yaml(self, image: str = 'gnes/gnes:latest-alpine') -> str:
"""
Generate the docker swarm YAML compose file
:param image: the default GNES docker image
:return: the generated YAML compose file
"""
from ruamel.yaml import YAML, StringIO
_yaml = YAML()
swarm_yml = {'version': '3.4',
'services': {}}

for k, v in self._service_nodes.items():
defaults_kwargs, _ = Flow._service_map[v['service']]['parser']().parse_known_args(
['--yaml_path', 'TrainableBase'])
non_default_kwargs = {k: v for k, v in vars(v['parsed_args']).items() if getattr(defaults_kwargs, k) != v}
if not isinstance(non_default_kwargs.get('yaml_path', ''), str):
non_default_kwargs['yaml_path'] = v['kwargs']['yaml_path']

num_replicas = None
if 'num_parallel' in non_default_kwargs:
num_replicas = non_default_kwargs.pop('num_parallel')

swarm_yml['services'][k] = {
'image': v['kwargs'].get('image', image),
'command': '%s %s' % (
Flow._service_map[v['service']]['cmd'],
' '.join(['--%s %s' % (k, v) for k, v in non_default_kwargs.items()]))
}
if num_replicas and num_replicas > 1:
swarm_yml['services'][k]['deploy'] = {'replicas': num_replicas}

stream = StringIO()
_yaml.dump(swarm_yml, stream)
return stream.getvalue().strip()

def to_python_code(self, indent: int = 4) -> str:
"""
Expand Down Expand Up @@ -355,7 +407,7 @@ def query(self, bytes_gen: Iterator[bytes] = None, **kwargs):

@_build_level(BuildLevel.RUNTIME)
def _call_client(self, bytes_gen: Iterator[bytes] = None, **kwargs):
args, p_args = self._get_parsed_args(self, set_client_cli_parser, kwargs)
args, p_args, unk_args = self._get_parsed_args(self, set_client_cli_parser, kwargs)
p_args.grpc_port = self._service_nodes[self._frontend]['parsed_args'].grpc_port
p_args.grpc_host = self._service_nodes[self._frontend]['parsed_args'].grpc_host
c = CLIClient(p_args, start_at_init=False)
Expand Down Expand Up @@ -473,10 +525,13 @@ def set(self, name: str, recv_from: Union[str, Tuple[str], List[str], 'Service']
if not clear_old_attr:
node['kwargs'].update(kwargs)
kwargs = node['kwargs']
args, p_args = op_flow._get_parsed_args(op_flow, Flow._service2parser[service], kwargs)
node['args'] = args
node['parsed_args'] = p_args
node['kwargs'] = kwargs
args, p_args, unk_args = op_flow._get_parsed_args(op_flow, Flow._service_map[service]['parser'], kwargs)
node.update({
'args': args,
'parsed_args': p_args,
'kwargs': kwargs,
'unk_args': unk_args
})

if as_last_service:
op_flow.set_last_service(name, False)
Expand Down Expand Up @@ -548,8 +603,8 @@ def add(self, service: Union['Service', str],
if isinstance(service, str):
service = Service.from_string(service)

if service not in Flow._service2parser:
raise ValueError('service: %s is not supported, should be one of %s' % (service, Flow._service2parser))
if service not in Flow._service_map:
raise ValueError('service: %s is not supported, should be one of %s' % (service, Flow._service_map.keys()))

if name in op_flow._service_nodes:
raise FlowTopologyError('name: %s is used in this Flow already!' % name)
Expand All @@ -567,15 +622,17 @@ def add(self, service: Union['Service', str],
recv_from = op_flow._parse_service_endpoints(op_flow, name, recv_from, connect_to_last_service=True)
send_to = op_flow._parse_service_endpoints(op_flow, name, send_to, connect_to_last_service=False)

args, p_args = op_flow._get_parsed_args(op_flow, Flow._service2parser[service], kwargs)
args, p_args, unk_args = op_flow._get_parsed_args(op_flow, Flow._service_map[service]['parser'], kwargs)

op_flow._service_nodes[name] = {
'service': service,
'parsed_args': p_args,
'args': args,
'incomes': recv_from,
'outgoings': send_to,
'kwargs': kwargs}
'kwargs': kwargs,
'unk_args': unk_args
}

# direct all income services' output to the current service
for s in recv_from:
Expand Down Expand Up @@ -638,7 +695,7 @@ def _get_parsed_args(op_flow, service_arg_parser, kwargs):
except SystemExit:
raise ValueError('bad arguments for service "%s", '
'you may want to double check your args "%s"' % (service_arg_parser, args))
return args, p_args
return args, p_args, unknown_args

def _build_graph(self, copy_flow: bool) -> 'Flow':
op_flow = copy.deepcopy(self) if copy_flow else self
Expand Down Expand Up @@ -748,7 +805,7 @@ def build(self, backend: Optional[str] = 'thread', copy_flow: bool = False, *arg
# for thread and process backend which runs locally, host_in and host_out should not be set
p_args.host_in = BaseService.default_host
p_args.host_out = BaseService.default_host
op_flow._service_contexts.append((Flow._service2builder[v['service']], p_args))
op_flow._service_contexts.append((Flow._service_map[v['service']]['builder'], p_args))
op_flow._build_level = Flow.BuildLevel.RUNTIME
else:
raise NotImplementedError('backend=%s is not supported yet' % backend)
Expand Down
2 changes: 2 additions & 0 deletions tests/test_gnes_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,5 @@ def test_flow_add_set(self):
f1.dump(self.dump_flow_path)
f3 = Flow.load(self.dump_flow_path)
self.assertEqual(f1, f3)

print(f1.to_swarm_yaml())

0 comments on commit 8a60c26

Please sign in to comment.