From e17ed793d032f74127360bfad0d0620f17124dd7 Mon Sep 17 00:00:00 2001 From: Joan Fontanals Martinez Date: Fri, 29 Sep 2023 10:08:05 +0200 Subject: [PATCH 1/2] fix: fix issue with same doc type in spawned processes --- .../runtimes/gateway/http_fastapi_app_docarrayv2.py | 3 ++- jina/serve/stream/__init__.py | 11 ++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py b/jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py index 8f99f11b62136..517365303769f 100644 --- a/jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py +++ b/jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py @@ -200,7 +200,8 @@ async def post(body: input_model, response: Response): else: docs = DocList[input_doc_list_model]([data]) if body.header is None: - req_id = docs[0].id + if hasattr(docs[0], 'id'): + req_id = docs[0].id try: async for resp in streamer.stream_docs( diff --git a/jina/serve/stream/__init__.py b/jina/serve/stream/__init__.py index 21685388bd4aa..f7b064bd0ace4 100644 --- a/jina/serve/stream/__init__.py +++ b/jina/serve/stream/__init__.py @@ -85,7 +85,6 @@ async def _get_endpoints_input_output_models( connection_pool, retry_forever=True, is_cancel=is_cancel ) self.logger.debug(f'Got all endpoints from TopologyGraph {endpoints}') - if endpoints is not None: for endp in endpoints: for origin_node in topology_graph.origin_nodes: @@ -102,6 +101,16 @@ async def _get_endpoints_input_output_models( and len(leaf_input_output_model) > 0 ): _endpoints_models_map[endp] = leaf_input_output_model[0] + cached_models = {} + for k, v in _endpoints_models_map.items(): + if v['input'].__name__ not in cached_models: + cached_models[v['input'].__name__] = v['input'] + else: + v['input'] = cached_models[v['input'].__name__] + if v['output'].__name__ not in cached_models: + cached_models[v['output'].__name__] = v['output'] + else: + v['output'] = cached_models[v['output'].__name__] return _endpoints_models_map async def stream_doc( From 1c4b9d3909b76bafec65704c4a0cd0966f608031 Mon Sep 17 00:00:00 2001 From: Joan Fontanals Martinez Date: Fri, 29 Sep 2023 11:36:33 +0200 Subject: [PATCH 2/2] test: add test --- .github/workflows/cd.yml | 1 + .github/workflows/ci.yml | 1 + .../docarray_v2/docker/__init__.py | 0 .../docarray_v2/docker/executor1/Dockerfile | 7 +++ .../docarray_v2/docker/executor1/__init__.py | 0 .../docarray_v2/docker/executor1/config.yml | 5 ++ .../docarray_v2/docker/executor1/executor.py | 23 +++++++ .../docarray_v2/docker/executor2/Dockerfile | 7 +++ .../docarray_v2/docker/executor2/__init__.py | 0 .../docarray_v2/docker/executor2/config.yml | 5 ++ .../docarray_v2/docker/executor2/executor.py | 39 ++++++++++++ .../docarray_v2/docker/test_with_docker.py | 61 +++++++++++++++++++ 12 files changed, 149 insertions(+) create mode 100644 tests/integration/docarray_v2/docker/__init__.py create mode 100644 tests/integration/docarray_v2/docker/executor1/Dockerfile create mode 100644 tests/integration/docarray_v2/docker/executor1/__init__.py create mode 100644 tests/integration/docarray_v2/docker/executor1/config.yml create mode 100644 tests/integration/docarray_v2/docker/executor1/executor.py create mode 100644 tests/integration/docarray_v2/docker/executor2/Dockerfile create mode 100644 tests/integration/docarray_v2/docker/executor2/__init__.py create mode 100644 tests/integration/docarray_v2/docker/executor2/config.yml create mode 100644 tests/integration/docarray_v2/docker/executor2/executor.py create mode 100644 tests/integration/docarray_v2/docker/test_with_docker.py diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 32141eba4e9d3..e438e7ba2cb28 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -147,6 +147,7 @@ jobs: pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_parameters_as_pydantic.py pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_streaming.py pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/sagemaker/test_sagemaker.py + pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/docker echo "flag it as jina for codeoverage" echo "codecov_flag=jina" >> $GITHUB_OUTPUT timeout-minutes: 45 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e19af8a54d120..6eaf4b2d0aab7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -482,6 +482,7 @@ jobs: pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_parameters_as_pydantic.py pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_streaming.py pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/sagemaker/test_sagemaker.py + pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/docker echo "flag it as jina for codeoverage" echo "codecov_flag=jina" >> $GITHUB_OUTPUT timeout-minutes: 45 diff --git a/tests/integration/docarray_v2/docker/__init__.py b/tests/integration/docarray_v2/docker/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/integration/docarray_v2/docker/executor1/Dockerfile b/tests/integration/docarray_v2/docker/executor1/Dockerfile new file mode 100644 index 0000000000000..86e8f8773925d --- /dev/null +++ b/tests/integration/docarray_v2/docker/executor1/Dockerfile @@ -0,0 +1,7 @@ +FROM jinaai/jina:test-pip + +COPY . /executor_root/ + +WORKDIR /executor_root + +ENTRYPOINT ["jina", "executor", "--uses", "config.yml"] diff --git a/tests/integration/docarray_v2/docker/executor1/__init__.py b/tests/integration/docarray_v2/docker/executor1/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/integration/docarray_v2/docker/executor1/config.yml b/tests/integration/docarray_v2/docker/executor1/config.yml new file mode 100644 index 0000000000000..e376cf0a6049a --- /dev/null +++ b/tests/integration/docarray_v2/docker/executor1/config.yml @@ -0,0 +1,5 @@ +jtype: Encoder +metas: + name: EncoderPrivate + py_modules: + - executor.py diff --git a/tests/integration/docarray_v2/docker/executor1/executor.py b/tests/integration/docarray_v2/docker/executor1/executor.py new file mode 100644 index 0000000000000..6d1696987e405 --- /dev/null +++ b/tests/integration/docarray_v2/docker/executor1/executor.py @@ -0,0 +1,23 @@ +from typing import Optional +from docarray import DocList, BaseDoc +from docarray.typing import NdArray +from jina import Executor, requests +import numpy as np + +class MyDoc(BaseDoc): + text: str + embedding: Optional[NdArray] = None + + +class Encoder(Executor): + def __init__( + self, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + + @requests + def encode(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]: + for doc in docs: + doc.embedding = np.random.random(128) diff --git a/tests/integration/docarray_v2/docker/executor2/Dockerfile b/tests/integration/docarray_v2/docker/executor2/Dockerfile new file mode 100644 index 0000000000000..86e8f8773925d --- /dev/null +++ b/tests/integration/docarray_v2/docker/executor2/Dockerfile @@ -0,0 +1,7 @@ +FROM jinaai/jina:test-pip + +COPY . /executor_root/ + +WORKDIR /executor_root + +ENTRYPOINT ["jina", "executor", "--uses", "config.yml"] diff --git a/tests/integration/docarray_v2/docker/executor2/__init__.py b/tests/integration/docarray_v2/docker/executor2/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/integration/docarray_v2/docker/executor2/config.yml b/tests/integration/docarray_v2/docker/executor2/config.yml new file mode 100644 index 0000000000000..e7c652cf05c00 --- /dev/null +++ b/tests/integration/docarray_v2/docker/executor2/config.yml @@ -0,0 +1,5 @@ +jtype: Indexer +metas: + name: IndexerPrivate + py_modules: + - executor.py diff --git a/tests/integration/docarray_v2/docker/executor2/executor.py b/tests/integration/docarray_v2/docker/executor2/executor.py new file mode 100644 index 0000000000000..4cadc028ec1ef --- /dev/null +++ b/tests/integration/docarray_v2/docker/executor2/executor.py @@ -0,0 +1,39 @@ +from typing import Optional, List +from docarray import DocList, BaseDoc +from docarray.typing import NdArray +from docarray.index import InMemoryExactNNIndex +from jina import Executor, requests + + +class MyDoc(BaseDoc): + text: str + embedding: Optional[NdArray] = None + + +class MyDocWithMatches(MyDoc): + matches: DocList[MyDoc] = [] + scores: List[float] = [] + + +class Indexer(Executor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._indexer = InMemoryExactNNIndex[MyDoc]() + + @requests(on='/index') + def index(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]: + self._indexer.index(docs) + return docs + + @requests(on='/search') + def search(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDocWithMatches]: + res = DocList[MyDocWithMatches]() + ret = self._indexer.find_batched(docs, search_field='embedding') + matched_documents = ret.documents + matched_scores = ret.scores + for query, matches, scores in zip(docs, matched_documents, matched_scores): + output_doc = MyDocWithMatches(**query.dict()) + output_doc.matches = matches + output_doc.scores = scores.tolist() + res.append(output_doc) + return res diff --git a/tests/integration/docarray_v2/docker/test_with_docker.py b/tests/integration/docarray_v2/docker/test_with_docker.py new file mode 100644 index 0000000000000..815c4f794764c --- /dev/null +++ b/tests/integration/docarray_v2/docker/test_with_docker.py @@ -0,0 +1,61 @@ +import os +import time + +import pytest +import requests as general_requests + +from jina import Flow + +cur_dir = os.path.dirname(os.path.abspath(__file__)) + + +@pytest.fixture +def executor_images_built(): + import docker + + client = docker.from_env() + client.images.build(path=os.path.join(cur_dir, 'executor1'), tag='encoder-executor') + client.images.build(path=os.path.join(cur_dir, 'executor2'), tag='indexer-executor') + client.close() + yield + time.sleep(2) + client = docker.from_env() + client.containers.prune() + + +@pytest.mark.parametrize('protocol', ['http', 'grpc']) +def test_flow_with_docker(executor_images_built, protocol): + from docarray import BaseDoc, DocList + from typing import Optional, List + from docarray.typing import NdArray + + class MyDoc(BaseDoc): + text: str + embedding: Optional[NdArray] = None + + class MyDocWithMatches(MyDoc): + matches: DocList[MyDoc] = [] + scores: List[float] = [] + + f = Flow(protocol=protocol).add(uses='docker://encoder-executor').add(uses='docker://indexer-executor') + + with f: + if protocol == 'http': + resp = general_requests.get(f'http://localhost:{f.port}/openapi.json') + resp.json() + + sentences = ['This framework generates embeddings for each input sentence', + 'Sentences are passed as a list of string.', + 'The quick brown fox jumps over the lazy dog.'] + + inputs = DocList[MyDoc]([MyDoc(text=sentence) for sentence in sentences]) + f.post(on='/index', inputs=inputs) + queries = inputs[0:2] + search_results = f.post(on='/search', inputs=queries, return_type=DocList[MyDocWithMatches]) + + assert len(search_results) == len(queries) + for result in search_results: + assert result.text in sentences + assert len(result.matches) == len(sentences) + for m in result.matches: + assert m.text in sentences