From ff89b38b66394203874a6b732458d6fada5e9824 Mon Sep 17 00:00:00 2001 From: Nan Wang Date: Tue, 19 May 2020 21:28:18 +0800 Subject: [PATCH] feat(executors): add docs --- jina/executors/clients.py | 29 +++++++++++++--- jina/executors/encoders/clients.py | 53 +++++++++++++++++++++-------- tests/executors/encoders/clients.py | 2 +- 3 files changed, 64 insertions(+), 20 deletions(-) diff --git a/jina/executors/clients.py b/jina/executors/clients.py index 90c001705bb49..e97abf0675d15 100644 --- a/jina/executors/clients.py +++ b/jina/executors/clients.py @@ -3,22 +3,43 @@ class BaseClientExecutor(BaseExecutor): + """ + :class:`BaseClientExecutor` is the base class for the executors that wrap up a client to other server. + + """ def __init__(self, host=None, port=None, timeout=-1, *args, **kwargs): + """ + :param host: the host address of the server + :param port: the host port of the server + :param timeout: waiting time in seconds until drop the request, by default 200 + """ super().__init__(*args, **kwargs) self.host = host self.port = port self.timeout = timeout if timeout >= 0 else 200 -class TFServingClientExecutor(BaseClientExecutor): - def __init__(self, service_name, input_name, output_name, signature_name='serving_default', *args, **kwargs): +class BaseTFServingClientExecutor(BaseClientExecutor): + """ + :class:`BaseTFServingClientExecutor` is the base class for the executors that wrap up a tf serving client. For the + sake of generality, this implementation has the dependency on :mod:`tensorflow_serving`. + + """ + def __init__(self, service_name, signature_name='serving_default', *args, **kwargs): + """ + :param service_name: the name of the tf serving service + :param signature_name: the name of the tf serving signature + + """ super().__init__(*args, **kwargs) self.service_name = service_name - self.input_name = input_name - self.output_name = output_name self.signature_name = signature_name def post_init(self): + """ + Initialize the channel and stub for the gRPC client + + """ from tensorflow_serving.apis import prediction_service_pb2_grpc self._channel = grpc.insecure_channel('{}:{}'.format(self.host, self.port)) self._stub = prediction_service_pb2_grpc.PredictionServiceStub(self._channel) diff --git a/jina/executors/encoders/clients.py b/jina/executors/encoders/clients.py index f2315c5485977..e5ccd6fddac72 100644 --- a/jina/executors/encoders/clients.py +++ b/jina/executors/encoders/clients.py @@ -1,11 +1,33 @@ from typing import Any import numpy as np -from ..clients import TFServingClientExecutor +from ..clients import BaseTFServingClientExecutor from . import BaseEncoder -class BaseTFServingEncoder(TFServingClientExecutor, BaseEncoder): +class BaseTFServingEncoder(BaseTFServingClientExecutor, BaseEncoder): + """ + :class:`BaseTFServingEncoder` is the base class for the encoders that wrap up a tf serving client. The client call + the gRPC port of the tf server. + + To implement your own executor with `tfserving`, + + .. highlight:: python + .. code-block:: python + + class MyAwesomeTFServingEncoder(BaseTFServingEncoder): + def get_input(self, data): + input_1 = data[:, 0] + input_2 = data[:, 1:] + return { + 'my_input_1': inpnut_1.reshape(-1, 1).astype(np.float32), + 'my_input_2': inpnut_2.astype(np.float32) + } + + def get_output(self, response): + return np.array(response.result().outputs['output_feature'].float_val) + + """ def encode(self, data: Any, *args, **kwargs) -> Any: _req = self.get_request(data) _rsp = self._stub.Predict.future(_req, self.timeout) @@ -14,21 +36,22 @@ def encode(self, data: Any, *args, **kwargs) -> Any: class UnaryTFServingEncoder(BaseTFServingEncoder): + """ + :class:`UnaryTFServingEncoder` is an encoder that wraps up a tf serving client. This client covers the simplest + case, in which both the request and the response have a single data field. + + """ + def __init__(self, input_name, output_name, *args, **kwargs): + """ + :param input_name: the name of data field in the request + :param output_name: the name of data field in the response + """ + super().__init__(*args, **kwargs) + self.input_name = input_name + self.output_name = output_name + def get_input(self, data): return {self.input_name: data.astype(np.float32)} def get_output(self, response): return np.array(response.result().outputs[self.output_name].float_val) - - -# class BertTFServingEncoder(TFServingEncoder): -# def _get_input(self, data): -# token_id, mask_id, pos_id = tokenizer.tokenize(data) -# return { -# 'token_id': token_id, -# 'mask_id': mask_id, -# 'pos_id': pos_id -# } -# -# def _get_output(self, response): -# return np.array(response.result().outputs[0].float_val) diff --git a/tests/executors/encoders/clients.py b/tests/executors/encoders/clients.py index b9ca802b125b7..d21447e7a2388 100644 --- a/tests/executors/encoders/clients.py +++ b/tests/executors/encoders/clients.py @@ -5,7 +5,7 @@ class MyTestCase(JinaTestCase): - # @unittest.skip('add grpc mocking for this test') + @unittest.skip('add grpc mocking for this test') def test_something(self): encoder = UnaryTFServingEncoder( host='0.0.0.0', port='8500', service_name='mnist',