Skip to content

Commit

Permalink
feat(executors): add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
nan-wang committed May 19, 2020
1 parent e57293f commit ff89b38
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 20 deletions.
29 changes: 25 additions & 4 deletions jina/executors/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,43 @@


class BaseClientExecutor(BaseExecutor):
"""
:class:`BaseClientExecutor` is the base class for the executors that wrap up a client to other server.
"""
def __init__(self, host=None, port=None, timeout=-1, *args, **kwargs):
"""
:param host: the host address of the server
:param port: the host port of the server
:param timeout: waiting time in seconds until drop the request, by default 200
"""
super().__init__(*args, **kwargs)
self.host = host
self.port = port
self.timeout = timeout if timeout >= 0 else 200


class TFServingClientExecutor(BaseClientExecutor):
def __init__(self, service_name, input_name, output_name, signature_name='serving_default', *args, **kwargs):
class BaseTFServingClientExecutor(BaseClientExecutor):
"""
:class:`BaseTFServingClientExecutor` is the base class for the executors that wrap up a tf serving client. For the
sake of generality, this implementation has the dependency on :mod:`tensorflow_serving`.
"""
def __init__(self, service_name, signature_name='serving_default', *args, **kwargs):
"""
:param service_name: the name of the tf serving service
:param signature_name: the name of the tf serving signature
"""
super().__init__(*args, **kwargs)
self.service_name = service_name
self.input_name = input_name
self.output_name = output_name
self.signature_name = signature_name

def post_init(self):
"""
Initialize the channel and stub for the gRPC client
"""
from tensorflow_serving.apis import prediction_service_pb2_grpc
self._channel = grpc.insecure_channel('{}:{}'.format(self.host, self.port))
self._stub = prediction_service_pb2_grpc.PredictionServiceStub(self._channel)
Expand Down
53 changes: 38 additions & 15 deletions jina/executors/encoders/clients.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,33 @@
from typing import Any
import numpy as np

from ..clients import TFServingClientExecutor
from ..clients import BaseTFServingClientExecutor
from . import BaseEncoder


class BaseTFServingEncoder(TFServingClientExecutor, BaseEncoder):
class BaseTFServingEncoder(BaseTFServingClientExecutor, BaseEncoder):
"""
:class:`BaseTFServingEncoder` is the base class for the encoders that wrap up a tf serving client. The client call
the gRPC port of the tf server.
To implement your own executor with `tfserving`,
.. highlight:: python
.. code-block:: python
class MyAwesomeTFServingEncoder(BaseTFServingEncoder):
def get_input(self, data):
input_1 = data[:, 0]
input_2 = data[:, 1:]
return {
'my_input_1': inpnut_1.reshape(-1, 1).astype(np.float32),
'my_input_2': inpnut_2.astype(np.float32)
}
def get_output(self, response):
return np.array(response.result().outputs['output_feature'].float_val)
"""
def encode(self, data: Any, *args, **kwargs) -> Any:
_req = self.get_request(data)
_rsp = self._stub.Predict.future(_req, self.timeout)
Expand All @@ -14,21 +36,22 @@ def encode(self, data: Any, *args, **kwargs) -> Any:


class UnaryTFServingEncoder(BaseTFServingEncoder):
"""
:class:`UnaryTFServingEncoder` is an encoder that wraps up a tf serving client. This client covers the simplest
case, in which both the request and the response have a single data field.
"""
def __init__(self, input_name, output_name, *args, **kwargs):
"""
:param input_name: the name of data field in the request
:param output_name: the name of data field in the response
"""
super().__init__(*args, **kwargs)
self.input_name = input_name
self.output_name = output_name

def get_input(self, data):
return {self.input_name: data.astype(np.float32)}

def get_output(self, response):
return np.array(response.result().outputs[self.output_name].float_val)


# class BertTFServingEncoder(TFServingEncoder):
# def _get_input(self, data):
# token_id, mask_id, pos_id = tokenizer.tokenize(data)
# return {
# 'token_id': token_id,
# 'mask_id': mask_id,
# 'pos_id': pos_id
# }
#
# def _get_output(self, response):
# return np.array(response.result().outputs[0].float_val)
2 changes: 1 addition & 1 deletion tests/executors/encoders/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class MyTestCase(JinaTestCase):
# @unittest.skip('add grpc mocking for this test')
@unittest.skip('add grpc mocking for this test')
def test_something(self):
encoder = UnaryTFServingEncoder(
host='0.0.0.0', port='8500', service_name='mnist',
Expand Down

0 comments on commit ff89b38

Please sign in to comment.