From 5949a195ecedc6ab53b9bb3e7005709233f273a9 Mon Sep 17 00:00:00 2001 From: Artem Shelkovnikov Date: Fri, 22 Dec 2023 15:20:37 +0100 Subject: [PATCH 1/5] Replace more places accessing Elasticsearch client directly with proxied calls --- connectors/cli/connector.py | 21 ++---- connectors/cli/job.py | 6 +- connectors/es/client.py | 108 ++++++++++++++++++++++++++++--- connectors/es/settings.py | 10 +++ connectors/es/sink.py | 116 ++++++++++------------------------ connectors/kibana.py | 89 ++++++-------------------- connectors/preflight_check.py | 3 +- tests/es/test_client.py | 36 ++++++++++- tests/test_kibana.py | 10 +-- tests/test_preflight_check.py | 4 +- tests/test_sink.py | 100 +++++++++++------------------ 11 files changed, 254 insertions(+), 249 deletions(-) diff --git a/connectors/cli/connector.py b/connectors/cli/connector.py index a964a60ca..362031daa 100644 --- a/connectors/cli/connector.py +++ b/connectors/cli/connector.py @@ -2,7 +2,7 @@ from collections import OrderedDict from connectors.es.client import ESManagementClient -from connectors.es.settings import DEFAULT_LANGUAGE, Mappings, Settings +from connectors.es.settings import DEFAULT_LANGUAGE from connectors.protocol import ( CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX, @@ -30,7 +30,8 @@ async def list_connectors(self): # TODO move this on top try: await self.es_management_client.ensure_exists( - indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] + indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX], + expand_wildcards="all", ) return [ @@ -87,25 +88,15 @@ async def __create( await self.es_management_client.close() async def __create_search_index(self, index_name, language): - mappings = Mappings.default_text_fields_mappings( - is_connectors_index=True, - ) - - settings = Settings(language_code=language, analysis_icu=False).to_hash() - - settings["auto_expand_replicas"] = "0-3" - settings["number_of_shards"] = 2 - - await self.es_management_client.client.indices.create( - index=index_name, mappings=mappings, settings=settings - ) + await self.es_management_client.create_content_index(index_name, language) async def __create_connector( self, index_name, service_type, configuration, is_native, language, from_index ): try: await self.es_management_client.ensure_exists( - indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] + indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX], + expand_wildcards="all", ) timestamp = iso_utc() diff --git a/connectors/cli/job.py b/connectors/cli/job.py index 2a41cad05..f721498e4 100644 --- a/connectors/cli/job.py +++ b/connectors/cli/job.py @@ -37,7 +37,8 @@ def job(self, job_id): async def __async_job(self, job_id): try: await self.es_management_client.ensure_exists( - indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] + indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX], + expand_wildcards="all", ) job = await self.sync_job_index.fetch_by_id(job_id) return job @@ -63,7 +64,8 @@ async def __async_start(self, connector_id, job_type): async def __async_list_jobs(self, connector_id, index_name, job_id): try: await self.es_management_client.ensure_exists( - indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] + indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX], + expand_wildcards="all", ) jobs = self.sync_job_index.get_all_docs( query=self.__job_list_query(connector_id, index_name, job_id), diff --git a/connectors/es/client.py b/connectors/es/client.py index 9bee8b0cd..f680db756 100644 --- a/connectors/es/client.py +++ b/connectors/es/client.py @@ -11,9 +11,16 @@ from elastic_transport.client_utils import url_to_node_config from elasticsearch import ApiError, AsyncElasticsearch, ConflictError -from elasticsearch import ConnectionError as ElasticConnectionError +from elasticsearch import ( + ConnectionError as ElasticConnectionError, +) +from elasticsearch import ( + NotFoundError as ElasticNotFoundError, +) +from elasticsearch.helpers import async_scan from connectors import __version__ +from connectors.es.settings import TIMESTAMP_FIELD, Mappings, Settings from connectors.logger import logger, set_extra_logger from connectors.utils import CancellableSleeps @@ -180,18 +187,71 @@ def __init__(self, config): # initialize ESIndex instance super().__init__(config) - async def ensure_exists(self, indices=None): + async def ensure_exists(self, indices=None, expand_wildcards="open"): if indices is None: indices = [] for index in indices: - logger.debug(f"Checking index {index}") - if not await self.client.indices.exists(index=index): + logger.debug( + f"Checking index {index} with expand_wildcards={expand_wildcards}" + ) + if not await self.client.indices.exists( + index=index, expand_wildcards=expand_wildcards + ): await self.client.indices.create(index=index) logger.debug(f"Created index {index}") - async def delete_indices(self, indices): - await self.client.indices.delete(index=indices, ignore_unavailable=True) + async def create_content_index(self, search_index_name, language_code): + settings = Settings(language_code=language_code, analysis_icu=False).to_hash() + mappings = Mappings.default_text_fields_mappings(is_connectors_index=True) + + return await self.client.indices.create( + index=search_index_name, mappings=mappings, settings=settings + ) + + async def ensure_content_index_mappings(self, index, mappings): + # open = Match open, non-hidden indices. Also matches any non-hidden data stream. + # Content indices are always non-hidden. + expand_wildcards = "open" + response = await self.client.indices.get_mapping( + index=index, expand_wildcards=expand_wildcards + ) + + existing_mappings = response[index].get("mappings", {}) + if len(existing_mappings) == 0 and mappings: + logger.debug( + "Index %s has no mappings or it's empty. Adding mappings...", index + ) + await self.client.indices.put_mapping( + index=index, + dynamic=mappings.get("dynamic", False), + dynamic_templates=mappings.get("dynamic_templates", []), + properties=mappings.get("properties", {}), + expand_wildcards=expand_wildcards, + ) + logger.debug("Successfully added mappings for index %s", index) + else: + logger.debug( + "Index %s already has mappings, skipping mappings creation", index + ) + + async def ensure_ingest_pipeline_exists( + self, pipeline_id, version, description, processors + ): + try: + await self.client.ingest.get_pipeline(id=pipeline_id) + except ElasticNotFoundError: + await self.client.ingest.put_pipeline( + id=pipeline_id, + version=version, + description=version, + processors=version, + ) + + async def delete_indices(self, indices, expand_wildcards="open"): + await self.client.indices.delete( + index=indices, expand_wildcards=expand_wildcards, ignore_unavailable=True + ) async def clean_index(self, index_name): return await self.client.delete_by_query( @@ -201,8 +261,40 @@ async def clean_index(self, index_name): async def list_indices(self): return await self.client.indices.stats(index="search-*") - async def index_exists(self, index_name): - return await self.client.indices.exists(index=index_name) + async def index_exists(self, index_name, expand_wildcards="open"): + return await self.client.indices.exists( + index=index_name, expand_wildcards=expand_wildcards + ) + + async def upsert(self, _id, index_name, doc): + await self.client.index( + id=_id, + index=index_name, + document=doc, + ) + + async def yield_existing_documents_metadata(self, index): + """Returns an iterator on the `id` and `_timestamp` fields of all documents in an index. + + WARNING + + This function will load all ids in memory -- on very large indices, + depending on the id length, it can be quite large. + + 300,000 ids will be around 50MiB + """ + logger.debug(f"Scanning existing index {index}") + if not self.index_exists(index): + return + + async for doc in async_scan( + client=self.client, index=index, _source=["id", TIMESTAMP_FIELD] + ): + source = doc["_source"] + doc_id = source.get("id", doc["_id"]) + timestamp = source.get(TIMESTAMP_FIELD) + + yield doc_id, timestamp def with_concurrency_control(retries=3): diff --git a/connectors/es/settings.py b/connectors/es/settings.py index 6ba510e36..6f2adb1a6 100644 --- a/connectors/es/settings.py +++ b/connectors/es/settings.py @@ -8,6 +8,8 @@ import yaml +TIMESTAMP_FIELD = "_timestamp" + ENUM_IGNORE_ABOVE = 2048 DATE_FIELD_MAPPING = {"type": "date"} @@ -275,6 +277,14 @@ def analyzer_definitions(self): return definitions + @property + def auto_expand_replicas(self): + return "0-1" + + @property + def number_of_shards(self): + return 2 + def __init__(self, *, language_code=None, analysis_icu=False): self._language_data = None self.language_code = language_code or DEFAULT_LANGUAGE diff --git a/connectors/es/sink.py b/connectors/es/sink.py index 8be41dec1..6533291db 100644 --- a/connectors/es/sink.py +++ b/connectors/es/sink.py @@ -24,13 +24,8 @@ import time from collections import defaultdict -from elasticsearch import ( - NotFoundError as ElasticNotFoundError, -) -from elasticsearch.helpers import async_scan - -from connectors.es import ESClient, Mappings -from connectors.es.settings import Settings +from connectors.es.client import ESManagementClient +from connectors.es.settings import TIMESTAMP_FIELD, Mappings from connectors.filtering.basic_rule import BasicRuleEngine, parse from connectors.logger import logger, tracer from connectors.protocol import Filter, JobType @@ -56,7 +51,6 @@ OP_INDEX = "index" OP_UPSERT = "update" OP_DELETE = "delete" -TIMESTAMP_FIELD = "_timestamp" CANCELATION_TIMEOUT = 5 @@ -73,6 +67,10 @@ class ForceCanceledError(Exception): pass +class ContentIndexDoesNotExistError(Exception): + pass + + class Sink: """Send bulk operations in batches by consuming a queue. @@ -81,7 +79,7 @@ class Sink: Arguments: - - `client` -- an instance of `connectors.es.ESClient` + - `client` -- an instance of `connectors.es.ESManagementClient` - `queue` -- an instance of `asyncio.Queue` to pull docs from - `chunk_size` -- a maximum number of operations to send per request - `pipeline` -- ingest pipeline settings to pass to the bulk API @@ -134,7 +132,7 @@ def _bulk_op(self, doc, operation=OP_INDEX): async def _batch_bulk(self, operations, stats): @retryable(retries=self.max_retires) async def _bulk_api_call(): - return await self.client.bulk( + return await self.client.client.bulk( operations=operations, pipeline=self.pipeline["name"] ) @@ -260,7 +258,7 @@ class Extractor: This class runs a coroutine that puts docs in `queue`, given a document generator. Arguments: - - client: an instance of `connectors.es.ESClient` + - client: an instance of `connectors.es.ESManagementClient` - queue: an `asyncio.Queue` to put docs in - index: the target Elasticsearch index - filter_: an instance of `Filter` to apply on the fetched document -- default: `None` @@ -303,32 +301,6 @@ def __init__( self._logger = logger_ or logger self._canceled = False - async def _get_existing_ids(self): - """Returns an iterator on the `id` and `_timestamp` fields of all documents in an index. - - - WARNING - - This function will load all ids in memory -- on very large indices, - depending on the id length, it can be quite large. - - 300,000 ids will be around 50MiB - """ - self._logger.debug(f"Scanning existing index {self.index}") - try: - await self.client.indices.get(index=self.index) - except ElasticNotFoundError: - return - - async for doc in async_scan( - client=self.client, - index=self.index, - _source=["id", TIMESTAMP_FIELD], - ): - doc_id = doc["_source"].get("id", doc["_id"]) - ts = doc["_source"].get(TIMESTAMP_FIELD) - yield doc_id, ts - async def _deferred_index(self, lazy_download, doc_id, doc, operation): data = await lazy_download(doit=True, timestamp=doc[TIMESTAMP_FIELD]) @@ -399,7 +371,12 @@ async def get_docs(self, generator): start = time.time() self._logger.info("Collecting local document ids") - existing_ids = {k: v async for (k, v) in self._get_existing_ids()} + existing_ids = { + k: v + async for (k, v) in self.client.yield_existing_documents_metadata( + self.index + ) + } self._logger.debug( f"Found {len(existing_ids)} docs in {self.index} (duration " f"{int(time.time() - start)} seconds) " @@ -534,7 +511,10 @@ async def get_access_control_docs(self, generator): existing_ids = { doc_id: last_update_timestamp - async for (doc_id, last_update_timestamp) in self._get_existing_ids() + async for ( + doc_id, + last_update_timestamp, + ) in self.client.yield_existing_documents_metadata(self.index) } if self._logger.isEnabledFor(logging.DEBUG): @@ -615,7 +595,7 @@ class AsyncBulkRunningError(Exception): pass -class SyncOrchestrator(ESClient): +class SyncOrchestrator: """This class is the sync orchestrator. It does the following in `async_bulk` @@ -629,66 +609,40 @@ class SyncOrchestrator(ESClient): def __init__(self, elastic_config, logger_=None): self._logger = logger_ or logger self._logger.debug(f"SyncOrchestrator connecting to {elastic_config['host']}") - super().__init__(elastic_config) + self.es_management_client = ESManagementClient(elastic_config) self.loop = asyncio.get_event_loop() self._extractor = None self._extractor_task = None self._sink = None self._sink_task = None + async def close(self): + await self.es_management_client.close() + + async def has_active_license_enabled(self, license_): + # TODO: think how to make it not a proxy method to the client + return await self.es_management_client.has_active_license_enabled(license_) + async def prepare_content_index(self, index, language_code=None): """Creates the index, given a mapping if it does not exists.""" self._logger.debug(f"Checking index {index}") - expand_wildcards = "open" - exists = await self.client.indices.exists( - index=index, expand_wildcards=expand_wildcards - ) + exists = await self.es_management_client.index_exists(index) mappings = Mappings.default_text_fields_mappings(is_connectors_index=True) if exists: # Update the index mappings if needed self._logger.debug(f"{index} exists") - await self._ensure_content_index_mappings(index, mappings, expand_wildcards) + await self.es_management_client.ensure_content_index_mappings( + index, mappings + ) else: # Create a new index self._logger.info(f"Creating content index: {index}") - await self._create_content_index( - index=index, language_code=language_code, mappings=mappings - ) + await self.es_management_client.create_content_index(index, language_code) self._logger.info(f"Content index successfully created: {index}") - async def _ensure_content_index_mappings(self, index, mappings, expand_wildcards): - response = await self.client.indices.get_mapping( - index=index, expand_wildcards=expand_wildcards - ) - - existing_mappings = response[index].get("mappings", {}) - if len(existing_mappings) == 0 and mappings: - self._logger.debug( - "Index %s has no mappings or it's empty. Adding mappings...", index - ) - await self.client.indices.put_mapping( - index=index, - dynamic=mappings.get("dynamic", False), - dynamic_templates=mappings.get("dynamic_templates", []), - properties=mappings.get("properties", {}), - expand_wildcards=expand_wildcards, - ) - self._logger.debug("Successfully added mappings for index %s", index) - else: - self._logger.debug( - "Index %s already has mappings, skipping mappings creation", index - ) - - async def _create_content_index(self, index, mappings, language_code=None): - settings = Settings(language_code=language_code, analysis_icu=False).to_hash() - - return await self.client.indices.create( - index=index, mappings=mappings, settings=settings - ) - def done(self): if self._extractor_task is not None and not self._extractor_task.done(): return False @@ -797,7 +751,7 @@ async def async_bulk( # start the fetcher self._extractor = Extractor( - self.client, + self.es_management_client, stream, index, filter_=filter_, @@ -813,7 +767,7 @@ async def async_bulk( # start the bulker self._sink = Sink( - self.client, + self.es_management_client, stream, chunk_size, pipeline, diff --git a/connectors/kibana.py b/connectors/kibana.py index 64787fde9..c37d51a51 100644 --- a/connectors/kibana.py +++ b/connectors/kibana.py @@ -10,11 +10,9 @@ import sys from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser -import elasticsearch - from connectors.config import load_config -from connectors.es.settings import DEFAULT_LANGUAGE, Mappings, Settings -from connectors.es.sink import SyncOrchestrator +from connectors.es.client import ESManagementClient +from connectors.es.settings import DEFAULT_LANGUAGE from connectors.logger import set_extra_logger from connectors.source import get_source_klass from connectors.utils import validate_index_name @@ -50,44 +48,14 @@ async def prepare(service_type, index_name, config, connector_definition=None): klass = get_source_klass(config["sources"][service_type]) - es = SyncOrchestrator(config["elasticsearch"]) - - # add a dummy pipeline - try: - pipeline = await es.client.ingest.get_pipeline( - id="ent-search-generic-ingestion" - ) - except elasticsearch.NotFoundError: - await es.client.ingest.put_pipeline( - id="ent-search-generic-ingestion", - version=DEFAULT_PIPELINE["version"], - description=DEFAULT_PIPELINE["description"], - processors=DEFAULT_PIPELINE["processors"], - ) + es = ESManagementClient(config["elasticsearch"]) - try: - pipeline = await es.client.ingest.get_pipeline( - id="ent-search-generic-ingestion" - ) - except elasticsearch.NotFoundError: - pipeline = { - "description": "My optional pipeline description", - "processors": [ - { - "set": { - "description": "My optional processor description", - "field": "my-keyword-field", - "value": "foo", - } - } - ], - } - - await es.client.ingest.put_pipeline( - id="ent-search-generic-ingestion", - description=pipeline["description"], - processors=pipeline["processors"], - ) + await es.ensure_ingest_pipeline_exists( + "ent-search-generic-ingestion", + DEFAULT_PIPELINE["version"], + DEFAULT_PIPELINE["description"], + DEFAULT_PIPELINE["processors"], + ) try: # https:#github.com/elastic/enterprise-search-team/discussions/2153#discussioncomment-2999765 @@ -183,28 +151,20 @@ async def prepare(service_type, index_name, config, connector_definition=None): } logger.info(f"Prepare {CONNECTORS_INDEX} document") - await es.client.index( - index=CONNECTORS_INDEX, - id=config["connectors"][0]["connector_id"], - document=doc, + await es.upsert( + index_name=CONNECTORS_INDEX, + _id=config["connectors"][0]["connector_id"], + doc=doc, ) logger.info(f"Prepare {index_name}") - mappings = Mappings.default_text_fields_mappings( - is_connectors_index=True, - ) - settings = Settings( - language_code=DEFAULT_LANGUAGE, analysis_icu=False - ).to_hash() - await upsert_index(es, index_name, mappings=mappings, settings=settings) + await upsert_index(es, index_name) logger.info("Done") finally: await es.close() -async def upsert_index( - es, index, docs=None, doc_ids=None, mappings=None, settings=None -): +async def upsert_index(es, index): """Override the index with new mappings and settings. If the index with such name exists, it's deleted and then created again @@ -222,28 +182,15 @@ async def upsert_index( else: expand_wildcards = "open" - exists = await es.client.indices.exists( - index=index, expand_wildcards=expand_wildcards - ) + exists = await es.index_exists(index, expand_wildcards) if exists: logger.debug(f"{index} exists, deleting...") logger.debug("Deleting it first") - await es.client.indices.delete(index=index, expand_wildcards=expand_wildcards) + await es.delete_indices([index], expand_wildcards) logger.debug(f"Creating index {index}") - await es.client.indices.create(index=index, mappings=mappings, settings=settings) - - if docs is None: - return - # TODO: bulk - - if doc_ids is None: - for doc_id, doc in enumerate(docs, start=1): - await es.client.index(index=index, id=doc_id, document=doc) - else: - for doc, doc_id in zip(docs, doc_ids, strict=True): - await es.client.index(index=index, id=doc_id, document=doc) + await es.create_content_index(index, DEFAULT_LANGUAGE) def _parser(): diff --git a/connectors/preflight_check.py b/connectors/preflight_check.py index 34616d8fa..ed6643afb 100644 --- a/connectors/preflight_check.py +++ b/connectors/preflight_check.py @@ -97,7 +97,8 @@ async def _check_system_indices_with_retries(self): # and located here: https://github.com/elastic/elasticsearch/tree/main/x-pack/plugin/core/template-resources/src/main/resources/entsearch/connector await self.es_management_client.ensure_exists( - indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] + indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX], + expand_wildcards="all", ) return True except Exception as e: diff --git a/tests/es/test_client.py b/tests/es/test_client.py index ff7bef2fe..683d087d6 100644 --- a/tests/es/test_client.py +++ b/tests/es/test_client.py @@ -4,6 +4,7 @@ # you may not use this file except in compliance with the Elastic License 2.0. # import base64 +from datetime import datetime from unittest import mock from unittest.mock import AsyncMock, Mock @@ -16,6 +17,7 @@ License, with_concurrency_control, ) +from tests.commons import AsyncIterator BASIC_CONFIG = {"username": "elastic", "password": "changeme"} API_CONFIG = {"api_key": "foo"} @@ -249,7 +251,9 @@ async def test_index_exists(self): es_management_client.client.indices.exists = AsyncMock() await es_management_client.index_exists(index_name=index_name) - es_management_client.client.indices.exists.assert_awaited_with(index=index_name) + es_management_client.client.indices.exists.assert_awaited_with( + index=index_name, expand_wildcards="open" + ) @pytest.mark.asyncio async def test_delete_indices(self): @@ -265,5 +269,33 @@ async def test_delete_indices(self): await es_management_client.delete_indices(indices=indices) es_management_client.client.indices.delete.assert_awaited_with( - index=indices, ignore_unavailable=True + index=indices, expand_wildcards="open", ignore_unavailable=True ) + + @pytest.mark.asyncio + async def test_yield_existing_documents_metadata(self, mock_responses): + config = { + "username": "elastic", + "password": "changeme", + "host": "http://nowhere.com:9200", + } + es_management_client = ESManagementClient(config) + es_management_client.client = AsyncMock() + es_management_client.client.index_exists = Mock(return_value=True) + + records = [ + {"_id": "1", "_source": {"_timestamp": str(datetime.now())}}, + {"_id": "2", "_source": {"_timestamp": str(datetime.now())}}, + ] + + with mock.patch( + "connectors.es.client.async_scan", return_value=AsyncIterator(records) + ): + ids = [] + async for doc_id, _ in es_management_client.yield_existing_documents_metadata( + "something" + ): + ids.append(doc_id) + + assert ids == ["1", "2"] + await es_management_client.close() diff --git a/tests/test_kibana.py b/tests/test_kibana.py index c4f6534c6..c87a40ef6 100644 --- a/tests/test_kibana.py +++ b/tests/test_kibana.py @@ -8,7 +8,7 @@ import pytest -from connectors.es.sink import SyncOrchestrator +from connectors.es.client import ESManagementClient from connectors.kibana import main, upsert_index HERE = os.path.dirname(__file__) @@ -27,7 +27,7 @@ def mock_index_creation(index, mock_responses, hidden=True): headers=headers, ) mock_responses.delete( - url, + f"{url}&ignore_unavailable=true", headers=headers, ) mock_responses.put( @@ -86,7 +86,7 @@ async def test_upsert_index(mock_responses): headers=headers, ) - es = SyncOrchestrator(config) + es = ESManagementClient(config) await upsert_index(es, "search-new-index") @@ -95,7 +95,7 @@ async def test_upsert_index(mock_responses): headers=headers, ) mock_responses.delete( - "http://nowhere.com:9200/search-new-index?expand_wildcards=open", + "http://nowhere.com:9200/search-new-index?expand_wildcards=open&ignore_unavailable=true", headers=headers, ) mock_responses.put( @@ -114,4 +114,4 @@ async def test_upsert_index(mock_responses): headers=headers, ) - await upsert_index(es, "search-new-index", docs=[{"_id": "2"}, {"_id": "3"}]) + await upsert_index(es, "search-new-index") diff --git a/tests/test_preflight_check.py b/tests/test_preflight_check.py index 7f25d0183..d481886ca 100644 --- a/tests/test_preflight_check.py +++ b/tests/test_preflight_check.py @@ -38,7 +38,9 @@ def mock_es_info(mock_responses, healthy=True, repeat=False): def mock_index_exists(mock_responses, index, exist=True, repeat=False): status = 200 if exist else 404 - mock_responses.head(f"{host}/{index}", status=status, repeat=repeat) + mock_responses.head( + f"{host}/{index}?expand_wildcards=all", status=status, repeat=repeat + ) def mock_index(mock_responses, index, doc_id, repeat=False): diff --git a/tests/test_sink.py b/tests/test_sink.py index 01ede3c4e..4465cbf62 100644 --- a/tests/test_sink.py +++ b/tests/test_sink.py @@ -14,7 +14,7 @@ from elasticsearch import BadRequestError from connectors.es import Mappings -from connectors.es.settings import Settings +from connectors.es.client import ESManagementClient from connectors.es.sink import ( OP_DELETE, OP_INDEX, @@ -68,19 +68,14 @@ async def test_prepare_content_index_raise_error_when_index_creation_failed( f"http://nowhere.com:9200/{index_name}", payload={"_id": "1"}, headers=headers, + status=400, ) es = SyncOrchestrator(config) - with mock.patch.object( - es.client.indices, - "create", - side_effect=[BadRequestError(message="test", body=None, meta=None)], - ): - with pytest.raises(BadRequestError): - await es.prepare_content_index(index_name) - - await es.close() + with pytest.raises(BadRequestError): + await es.prepare_content_index(index_name) + await es.close() @pytest.mark.asyncio @@ -88,6 +83,7 @@ async def test_prepare_content_index_create_index( mock_responses, ): index_name = "search-new-index" + language_code = "jp" config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"} headers = {"X-Elastic-Product": "Elasticsearch"} mock_responses.post( @@ -109,24 +105,16 @@ async def test_prepare_content_index_create_index( create_index_result = asyncio.Future() create_index_result.set_result({"acknowledged": True}) - mappings = Mappings.default_text_fields_mappings(is_connectors_index=True) - - settings = Settings(analysis_icu=False).to_hash() - with mock.patch.object( - es.client.indices, "create", return_value=create_index_result + es.es_management_client, + "create_content_index", + return_value=create_index_result, ) as create_index_mock: - await es.prepare_content_index(index_name) + await es.prepare_content_index(index_name, language_code) await es.close() - expected_params = { - "index": index_name, - "mappings": mappings, - "settings": settings, - } - - create_index_mock.assert_called_with(**expected_params) + create_index_mock.assert_called_with(index_name, language_code) @pytest.mark.asyncio @@ -150,30 +138,20 @@ async def test_prepare_content_index(mock_responses): "http://nowhere.com:9200/search-new-index/_mapping?expand_wildcards=open", headers=headers, payload=mappings, + body='{"acknowledged": True}', ) es = SyncOrchestrator(config) - put_mappings_result = asyncio.Future() - put_mappings_result.set_result({"acknowledged": True}) + index_name = "search-new-index" with mock.patch.object( - es.client.indices, - "put_mapping", - return_value=put_mappings_result, + es.es_management_client, + "ensure_content_index_mappings", ) as put_mapping_mock: - index_name = "search-new-index" await es.prepare_content_index(index_name) await es.close() - expected_params = { - "index": index_name, - "dynamic": mappings["dynamic"], - "dynamic_templates": mappings["dynamic_templates"], - "properties": mappings["properties"], - "expand_wildcards": "open", - } - - put_mapping_mock.assert_called_with(**expected_params) + put_mapping_mock.assert_called_with(index_name, mappings) def set_responses(mock_responses, ts=None): @@ -257,21 +235,6 @@ def set_responses(mock_responses, ts=None): ) -@pytest.mark.asyncio -async def test_get_existing_ids(mock_responses): - config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"} - set_responses(mock_responses) - - es = SyncOrchestrator(config) - extractor = Extractor(es.client, None, "search-some-index") - ids = [] - async for doc_id, _ in extractor._get_existing_ids(): - ids.append(doc_id) - - assert ids == ["1", "2"] - await es.close() - - @pytest.mark.asyncio async def test_async_bulk(mock_responses): config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"} @@ -413,11 +376,16 @@ async def setup_extractor( sync_rules_enabled=False, content_extraction_enabled=False, ): + config = { + "username": "elastic", + "password": "changeme", + "host": "http://nowhere.com:9200", + } # filtering content doesn't matter as the BasicRuleEngine behavior is mocked filter_mock = Mock() filter_mock.get_active_filter = Mock(return_value={}) extractor = Extractor( - None, + ESManagementClient(config), queue, INDEX, filter_=filter_mock, @@ -657,10 +625,10 @@ async def setup_extractor( ), ], ) -@mock.patch("connectors.es.sink.Extractor._get_existing_ids") +@mock.patch("connectors.es.client.ESManagementClient.yield_existing_documents_metadata") @pytest.mark.asyncio async def test_get_docs( - get_existing_ids, + yield_existing_documents_metadata, existing_docs, docs_from_source, doc_should_ingest, @@ -674,7 +642,7 @@ async def test_get_docs( ): lazy_downloads = await lazy_downloads_mock() - get_existing_ids.return_value = AsyncIterator( + yield_existing_documents_metadata.return_value = AsyncIterator( [(doc["_id"], doc["_timestamp"]) for doc in existing_docs] ) @@ -951,10 +919,10 @@ async def test_get_docs_incrementally( ), ], ) -@mock.patch("connectors.es.sink.Extractor._get_existing_ids") +@mock.patch("connectors.es.client.ESManagementClient.yield_existing_documents_metadata") @pytest.mark.asyncio async def test_get_access_control_docs( - get_existing_ids, + yield_existing_documents_metadata, existing_docs, docs_from_source, expected_queue_operations, @@ -962,7 +930,7 @@ async def test_get_access_control_docs( expected_total_docs_created, expected_total_docs_deleted, ): - get_existing_ids.return_value = AsyncIterator( + yield_existing_documents_metadata.return_value = AsyncIterator( [(doc["_id"], doc["_timestamp"]) for doc in existing_docs] ) @@ -1084,7 +1052,13 @@ def test_bulk_populate_stats(res, expected_result): @pytest.mark.asyncio async def test_batch_bulk_with_retry(): - client = Mock() + config = { + "username": "elastic", + "password": "changeme", + "host": "http://nowhere.com:9200", + } + client = ESManagementClient(config) + client.client = AsyncMock() sink = Sink( client=client, queue=None, @@ -1097,10 +1071,10 @@ async def test_batch_bulk_with_retry(): with mock.patch.object(asyncio, "sleep"): # first call raises exception, and the second call succeeds - client.bulk = AsyncMock(side_effect=[Exception(), {"items": []}]) + client.client.bulk = AsyncMock(side_effect=[Exception(), {"items": []}]) await sink._batch_bulk([], {OP_INDEX: {}, OP_UPSERT: {}, OP_DELETE: {}}) - assert client.bulk.await_count == 2 + assert client.client.bulk.await_count == 2 @pytest.mark.parametrize( From 11d4a26bec3047e7ca284866ffac886334a4d69e Mon Sep 17 00:00:00 2001 From: Artem Shelkovnikov Date: Wed, 27 Dec 2023 16:15:01 +0100 Subject: [PATCH 2/5] Address feedback --- connectors/cli/auth.py | 2 +- connectors/cli/connector.py | 9 +- connectors/cli/index.py | 2 +- connectors/cli/job.py | 2 +- connectors/es/client.py | 134 --------------------------- connectors/es/management_client.py | 143 +++++++++++++++++++++++++++++ connectors/es/sink.py | 2 +- connectors/kibana.py | 2 +- connectors/preflight_check.py | 2 +- connectors/services/job_cleanup.py | 2 +- tests/es/test_client.py | 67 -------------- tests/es/test_management_client.py | 78 ++++++++++++++++ tests/services/test_job_cleanup.py | 2 +- tests/sources/fixtures/fixture.py | 2 +- tests/test_connectors_cli.py | 10 +- tests/test_kibana.py | 2 +- tests/test_sink.py | 10 +- 17 files changed, 247 insertions(+), 224 deletions(-) create mode 100644 connectors/es/management_client.py create mode 100644 tests/es/test_management_client.py diff --git a/connectors/cli/auth.py b/connectors/cli/auth.py index cee0d71e5..6cdf41cdc 100644 --- a/connectors/cli/auth.py +++ b/connectors/cli/auth.py @@ -4,7 +4,7 @@ import yaml from elasticsearch import ApiError -from connectors.es.client import ESManagementClient +from connectors.es.management_client import ESManagementClient CONFIG_FILE_PATH = ".cli/config.yml" diff --git a/connectors/cli/connector.py b/connectors/cli/connector.py index 362031daa..15cb33f38 100644 --- a/connectors/cli/connector.py +++ b/connectors/cli/connector.py @@ -1,7 +1,7 @@ import asyncio from collections import OrderedDict -from connectors.es.client import ESManagementClient +from connectors.es.management_client import ESManagementClient from connectors.es.settings import DEFAULT_LANGUAGE from connectors.protocol import ( CONCRETE_CONNECTORS_INDEX, @@ -87,9 +87,6 @@ async def __create( finally: await self.es_management_client.close() - async def __create_search_index(self, index_name, language): - await self.es_management_client.create_content_index(index_name, language) - async def __create_connector( self, index_name, service_type, configuration, is_native, language, from_index ): @@ -101,7 +98,9 @@ async def __create_connector( timestamp = iso_utc() if not from_index: - await self.__create_search_index(index_name, language) + await self.es_management_client.create_content_index( + index_name, language + ) api_key = await self.__create_api_key(index_name) diff --git a/connectors/cli/index.py b/connectors/cli/index.py index e37a7b0aa..aa22d961d 100644 --- a/connectors/cli/index.py +++ b/connectors/cli/index.py @@ -2,7 +2,7 @@ from elasticsearch import ApiError -from connectors.es.client import ESManagementClient +from connectors.es.management_client import ESManagementClient from connectors.protocol import ConnectorIndex diff --git a/connectors/cli/job.py b/connectors/cli/job.py index f721498e4..ee1b713a7 100644 --- a/connectors/cli/job.py +++ b/connectors/cli/job.py @@ -2,7 +2,7 @@ from elasticsearch import ApiError -from connectors.es.client import ESManagementClient +from connectors.es.management_client import ESManagementClient from connectors.protocol import ( CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX, diff --git a/connectors/es/client.py b/connectors/es/client.py index f680db756..30170d6c0 100644 --- a/connectors/es/client.py +++ b/connectors/es/client.py @@ -14,13 +14,8 @@ from elasticsearch import ( ConnectionError as ElasticConnectionError, ) -from elasticsearch import ( - NotFoundError as ElasticNotFoundError, -) -from elasticsearch.helpers import async_scan from connectors import __version__ -from connectors.es.settings import TIMESTAMP_FIELD, Mappings, Settings from connectors.logger import logger, set_extra_logger from connectors.utils import CancellableSleeps @@ -168,135 +163,6 @@ async def wait(self): return False -class ESManagementClient(ESClient): - """ - Elasticsearch client with methods to manage connector-related indices. - - Additionally to regular methods of ESClient, this client provides methods to work with arbitrary indices, - for example allowing to list indices, delete indices, wipe data from indices and such. - - ESClient should be used to provide rich clients that operate on "domains", such as: - - specific connector - - specific job - - This client, on the contrary, is used to manage a number of indices outside of connector protocol operations. - """ - - def __init__(self, config): - logger.debug(f"ESManagementClient connecting to {config['host']}") - # initialize ESIndex instance - super().__init__(config) - - async def ensure_exists(self, indices=None, expand_wildcards="open"): - if indices is None: - indices = [] - - for index in indices: - logger.debug( - f"Checking index {index} with expand_wildcards={expand_wildcards}" - ) - if not await self.client.indices.exists( - index=index, expand_wildcards=expand_wildcards - ): - await self.client.indices.create(index=index) - logger.debug(f"Created index {index}") - - async def create_content_index(self, search_index_name, language_code): - settings = Settings(language_code=language_code, analysis_icu=False).to_hash() - mappings = Mappings.default_text_fields_mappings(is_connectors_index=True) - - return await self.client.indices.create( - index=search_index_name, mappings=mappings, settings=settings - ) - - async def ensure_content_index_mappings(self, index, mappings): - # open = Match open, non-hidden indices. Also matches any non-hidden data stream. - # Content indices are always non-hidden. - expand_wildcards = "open" - response = await self.client.indices.get_mapping( - index=index, expand_wildcards=expand_wildcards - ) - - existing_mappings = response[index].get("mappings", {}) - if len(existing_mappings) == 0 and mappings: - logger.debug( - "Index %s has no mappings or it's empty. Adding mappings...", index - ) - await self.client.indices.put_mapping( - index=index, - dynamic=mappings.get("dynamic", False), - dynamic_templates=mappings.get("dynamic_templates", []), - properties=mappings.get("properties", {}), - expand_wildcards=expand_wildcards, - ) - logger.debug("Successfully added mappings for index %s", index) - else: - logger.debug( - "Index %s already has mappings, skipping mappings creation", index - ) - - async def ensure_ingest_pipeline_exists( - self, pipeline_id, version, description, processors - ): - try: - await self.client.ingest.get_pipeline(id=pipeline_id) - except ElasticNotFoundError: - await self.client.ingest.put_pipeline( - id=pipeline_id, - version=version, - description=version, - processors=version, - ) - - async def delete_indices(self, indices, expand_wildcards="open"): - await self.client.indices.delete( - index=indices, expand_wildcards=expand_wildcards, ignore_unavailable=True - ) - - async def clean_index(self, index_name): - return await self.client.delete_by_query( - index=index_name, body={"query": {"match_all": {}}}, ignore_unavailable=True - ) - - async def list_indices(self): - return await self.client.indices.stats(index="search-*") - - async def index_exists(self, index_name, expand_wildcards="open"): - return await self.client.indices.exists( - index=index_name, expand_wildcards=expand_wildcards - ) - - async def upsert(self, _id, index_name, doc): - await self.client.index( - id=_id, - index=index_name, - document=doc, - ) - - async def yield_existing_documents_metadata(self, index): - """Returns an iterator on the `id` and `_timestamp` fields of all documents in an index. - - WARNING - - This function will load all ids in memory -- on very large indices, - depending on the id length, it can be quite large. - - 300,000 ids will be around 50MiB - """ - logger.debug(f"Scanning existing index {index}") - if not self.index_exists(index): - return - - async for doc in async_scan( - client=self.client, index=index, _source=["id", TIMESTAMP_FIELD] - ): - source = doc["_source"] - doc_id = source.get("id", doc["_id"]) - timestamp = source.get(TIMESTAMP_FIELD) - - yield doc_id, timestamp - - def with_concurrency_control(retries=3): def wrapper(func): @functools.wraps(func) diff --git a/connectors/es/management_client.py b/connectors/es/management_client.py new file mode 100644 index 000000000..ed4c42dfa --- /dev/null +++ b/connectors/es/management_client.py @@ -0,0 +1,143 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# + +from elasticsearch import ( + NotFoundError as ElasticNotFoundError, +) +from elasticsearch.helpers import async_scan + +from connectors.es.client import ESClient +from connectors.es.settings import TIMESTAMP_FIELD, Mappings, Settings +from connectors.logger import logger + + +class ESManagementClient(ESClient): + """ + Elasticsearch client with methods to manage connector-related indices. + + Additionally to regular methods of ESClient, this client provides methods to work with arbitrary indices, + for example allowing to list indices, delete indices, wipe data from indices and such. + + ESClient should be used to provide rich clients that operate on "domains", such as: + - specific connector + - specific job + + This client, on the contrary, is used to manage a number of indices outside of connector protocol operations. + """ + + def __init__(self, config): + logger.debug(f"ESManagementClient connecting to {config['host']}") + # initialize ESIndex instance + super().__init__(config) + + async def ensure_exists(self, indices=None, expand_wildcards="open"): + if indices is None: + indices = [] + + for index in indices: + logger.debug( + f"Checking index {index} with expand_wildcards={expand_wildcards}" + ) + if not await self.client.indices.exists( + index=index, expand_wildcards=expand_wildcards + ): + await self.client.indices.create(index=index) + logger.debug(f"Created index {index}") + + async def create_content_index(self, search_index_name, language_code): + settings = Settings(language_code=language_code, analysis_icu=False).to_hash() + mappings = Mappings.default_text_fields_mappings(is_connectors_index=True) + + return await self.client.indices.create( + index=search_index_name, mappings=mappings, settings=settings + ) + + async def ensure_content_index_mappings(self, index, mappings): + # open = Match open, non-hidden indices. Also matches any non-hidden data stream. + # Content indices are always non-hidden. + expand_wildcards = "open" + response = await self.client.indices.get_mapping( + index=index, expand_wildcards=expand_wildcards + ) + + existing_mappings = response[index].get("mappings", {}) + if len(existing_mappings) == 0 and mappings: + logger.debug( + "Index %s has no mappings or it's empty. Adding mappings...", index + ) + await self.client.indices.put_mapping( + index=index, + dynamic=mappings.get("dynamic", False), + dynamic_templates=mappings.get("dynamic_templates", []), + properties=mappings.get("properties", {}), + expand_wildcards=expand_wildcards, + ) + logger.debug("Successfully added mappings for index %s", index) + else: + logger.debug( + "Index %s already has mappings, skipping mappings creation", index + ) + + async def ensure_ingest_pipeline_exists( + self, pipeline_id, version, description, processors + ): + try: + await self.client.ingest.get_pipeline(id=pipeline_id) + except ElasticNotFoundError: + await self.client.ingest.put_pipeline( + id=pipeline_id, + version=version, + description=version, + processors=version, + ) + + async def delete_indices(self, indices, expand_wildcards="open"): + await self.client.indices.delete( + index=indices, expand_wildcards=expand_wildcards, ignore_unavailable=True + ) + + async def clean_index(self, index_name): + return await self.client.delete_by_query( + index=index_name, body={"query": {"match_all": {}}}, ignore_unavailable=True + ) + + async def list_indices(self): + return await self.client.indices.stats(index="search-*") + + async def index_exists(self, index_name, expand_wildcards="open"): + return await self.client.indices.exists( + index=index_name, expand_wildcards=expand_wildcards + ) + + async def upsert(self, _id, index_name, doc): + await self.client.index( + id=_id, + index=index_name, + document=doc, + ) + + async def yield_existing_documents_metadata(self, index): + """Returns an iterator on the `id` and `_timestamp` fields of all documents in an index. + + WARNING + + This function will load all ids in memory -- on very large indices, + depending on the id length, it can be quite large. + + 300,000 ids will be around 50MiB + """ + logger.debug(f"Scanning existing index {index}") + if not self.index_exists(index): + return + + async for doc in async_scan( + client=self.client, index=index, _source=["id", TIMESTAMP_FIELD] + ): + source = doc["_source"] + doc_id = source.get("id", doc["_id"]) + timestamp = source.get(TIMESTAMP_FIELD) + + yield doc_id, timestamp diff --git a/connectors/es/sink.py b/connectors/es/sink.py index 6533291db..0e294c303 100644 --- a/connectors/es/sink.py +++ b/connectors/es/sink.py @@ -24,7 +24,7 @@ import time from collections import defaultdict -from connectors.es.client import ESManagementClient +from connectors.es.management_client import ESManagementClient from connectors.es.settings import TIMESTAMP_FIELD, Mappings from connectors.filtering.basic_rule import BasicRuleEngine, parse from connectors.logger import logger, tracer diff --git a/connectors/kibana.py b/connectors/kibana.py index c37d51a51..8302a3c07 100644 --- a/connectors/kibana.py +++ b/connectors/kibana.py @@ -11,7 +11,7 @@ from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser from connectors.config import load_config -from connectors.es.client import ESManagementClient +from connectors.es.management_client import ESManagementClient from connectors.es.settings import DEFAULT_LANGUAGE from connectors.logger import set_extra_logger from connectors.source import get_source_klass diff --git a/connectors/preflight_check.py b/connectors/preflight_check.py index ed6643afb..dc556c5bd 100644 --- a/connectors/preflight_check.py +++ b/connectors/preflight_check.py @@ -6,7 +6,7 @@ import aiohttp -from connectors.es.client import ESManagementClient +from connectors.es.management_client import ESManagementClient from connectors.logger import logger from connectors.protocol import CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX from connectors.utils import CancellableSleeps diff --git a/connectors/services/job_cleanup.py b/connectors/services/job_cleanup.py index d3d471e1e..30f412a0f 100644 --- a/connectors/services/job_cleanup.py +++ b/connectors/services/job_cleanup.py @@ -7,8 +7,8 @@ A task periodically clean up orphaned and idle jobs. """ -from connectors.es.client import ESManagementClient from connectors.es.index import DocumentNotFoundError +from connectors.es.management_client import ESManagementClient from connectors.logger import logger from connectors.protocol import ConnectorIndex, SyncJobIndex from connectors.services.base import BaseService diff --git a/tests/es/test_client.py b/tests/es/test_client.py index 683d087d6..806b7093f 100644 --- a/tests/es/test_client.py +++ b/tests/es/test_client.py @@ -4,7 +4,6 @@ # you may not use this file except in compliance with the Elastic License 2.0. # import base64 -from datetime import datetime from unittest import mock from unittest.mock import AsyncMock, Mock @@ -13,11 +12,9 @@ from connectors.es.client import ( ESClient, - ESManagementClient, License, with_concurrency_control, ) -from tests.commons import AsyncIterator BASIC_CONFIG = {"username": "elastic", "password": "changeme"} API_CONFIG = {"api_key": "foo"} @@ -235,67 +232,3 @@ async def test_es_client_no_server(self): # Execute assert not await es_client.ping() await es_client.close() - - -class TestESManagementClient: - @pytest.mark.asyncio - async def test_index_exists(self): - config = { - "username": "elastic", - "password": "changeme", - "host": "http://nowhere.com:9200", - } - index_name = "search-mongo" - es_management_client = ESManagementClient(config) - es_management_client.client = Mock() - es_management_client.client.indices.exists = AsyncMock() - - await es_management_client.index_exists(index_name=index_name) - es_management_client.client.indices.exists.assert_awaited_with( - index=index_name, expand_wildcards="open" - ) - - @pytest.mark.asyncio - async def test_delete_indices(self): - config = { - "username": "elastic", - "password": "changeme", - "host": "http://nowhere.com:9200", - } - indices = ["search-mongo"] - es_management_client = ESManagementClient(config) - es_management_client.client = Mock() - es_management_client.client.indices.delete = AsyncMock() - - await es_management_client.delete_indices(indices=indices) - es_management_client.client.indices.delete.assert_awaited_with( - index=indices, expand_wildcards="open", ignore_unavailable=True - ) - - @pytest.mark.asyncio - async def test_yield_existing_documents_metadata(self, mock_responses): - config = { - "username": "elastic", - "password": "changeme", - "host": "http://nowhere.com:9200", - } - es_management_client = ESManagementClient(config) - es_management_client.client = AsyncMock() - es_management_client.client.index_exists = Mock(return_value=True) - - records = [ - {"_id": "1", "_source": {"_timestamp": str(datetime.now())}}, - {"_id": "2", "_source": {"_timestamp": str(datetime.now())}}, - ] - - with mock.patch( - "connectors.es.client.async_scan", return_value=AsyncIterator(records) - ): - ids = [] - async for doc_id, _ in es_management_client.yield_existing_documents_metadata( - "something" - ): - ids.append(doc_id) - - assert ids == ["1", "2"] - await es_management_client.close() diff --git a/tests/es/test_management_client.py b/tests/es/test_management_client.py new file mode 100644 index 000000000..be4524719 --- /dev/null +++ b/tests/es/test_management_client.py @@ -0,0 +1,78 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# +from datetime import datetime +from unittest import mock +from unittest.mock import AsyncMock, Mock + +import pytest + +from connectors.es.management_client import ESManagementClient +from tests.commons import AsyncIterator + + +class TestESManagementClient: + @pytest.mark.asyncio + async def test_index_exists(self): + config = { + "username": "elastic", + "password": "changeme", + "host": "http://nowhere.com:9200", + } + index_name = "search-mongo" + es_management_client = ESManagementClient(config) + es_management_client.client = Mock() + es_management_client.client.indices.exists = AsyncMock() + + await es_management_client.index_exists(index_name=index_name) + es_management_client.client.indices.exists.assert_awaited_with( + index=index_name, expand_wildcards="open" + ) + + @pytest.mark.asyncio + async def test_delete_indices(self): + config = { + "username": "elastic", + "password": "changeme", + "host": "http://nowhere.com:9200", + } + indices = ["search-mongo"] + es_management_client = ESManagementClient(config) + es_management_client.client = Mock() + es_management_client.client.indices.delete = AsyncMock() + + await es_management_client.delete_indices(indices=indices) + es_management_client.client.indices.delete.assert_awaited_with( + index=indices, expand_wildcards="open", ignore_unavailable=True + ) + + @pytest.mark.asyncio + async def test_yield_existing_documents_metadata(self, mock_responses): + config = { + "username": "elastic", + "password": "changeme", + "host": "http://nowhere.com:9200", + } + es_management_client = ESManagementClient(config) + es_management_client.client = AsyncMock() + es_management_client.client.index_exists = Mock(return_value=True) + + records = [ + {"_id": "1", "_source": {"_timestamp": str(datetime.now())}}, + {"_id": "2", "_source": {"_timestamp": str(datetime.now())}}, + ] + + with mock.patch( + "connectors.es.management_client.async_scan", + return_value=AsyncIterator(records), + ): + ids = [] + async for doc_id, _ in es_management_client.yield_existing_documents_metadata( + "something" + ): + ids.append(doc_id) + + assert ids == ["1", "2"] + await es_management_client.close() diff --git a/tests/services/test_job_cleanup.py b/tests/services/test_job_cleanup.py index 65b1251d1..6308c556b 100644 --- a/tests/services/test_job_cleanup.py +++ b/tests/services/test_job_cleanup.py @@ -47,7 +47,7 @@ def mock_sync_job(sync_job_id="1", connector_id="1", index_name="index_name"): @pytest.mark.asyncio @patch("connectors.protocol.SyncJobIndex.delete_jobs") -@patch("connectors.es.client.ESManagementClient.delete_indices") +@patch("connectors.es.management_client.ESManagementClient.delete_indices") @patch("connectors.protocol.SyncJobIndex.idle_jobs") @patch("connectors.protocol.SyncJobIndex.orphaned_jobs") @patch("connectors.protocol.ConnectorIndex.fetch_by_id") diff --git a/tests/sources/fixtures/fixture.py b/tests/sources/fixtures/fixture.py index 91c220ce1..e0608f9c2 100644 --- a/tests/sources/fixtures/fixture.py +++ b/tests/sources/fixtures/fixture.py @@ -20,7 +20,7 @@ from elastic_transport import ConnectionTimeout from elasticsearch import ApiError -from connectors.es.client import ESManagementClient +from connectors.es.management_client import ESManagementClient from connectors.logger import set_extra_logger from connectors.utils import ( RetryStrategy, diff --git a/tests/test_connectors_cli.py b/tests/test_connectors_cli.py index 573e6cd4f..2aa16e84c 100644 --- a/tests/test_connectors_cli.py +++ b/tests/test_connectors_cli.py @@ -454,7 +454,7 @@ def test_index_list_one_index(): indices = {"indices": {"test_index": {"primaries": {"docs": {"count": 10}}}}} with patch( - "connectors.es.client.ESManagementClient.list_indices", + "connectors.es.management_client.ESManagementClient.list_indices", AsyncMock(return_value=indices), ): result = runner.invoke(cli, ["index", "list"]) @@ -468,7 +468,7 @@ def test_index_clean(): runner = CliRunner() index_name = "test_index" with patch( - "connectors.es.client.ESManagementClient.clean_index", + "connectors.es.management_client.ESManagementClient.clean_index", AsyncMock(return_value=True), ) as mocked_method: result = runner.invoke(cli, ["index", "clean", index_name]) @@ -483,7 +483,7 @@ def test_index_clean_error(): runner = CliRunner() index_name = "test_index" with patch( - "connectors.es.client.ESManagementClient.clean_index", + "connectors.es.management_client.ESManagementClient.clean_index", side_effect=ApiError(500, meta="meta", body="error"), ): result = runner.invoke(cli, ["index", "clean", index_name]) @@ -497,7 +497,7 @@ def test_index_delete(): runner = CliRunner() index_name = "test_index" with patch( - "connectors.es.client.ESManagementClient.delete_indices", + "connectors.es.management_client.ESManagementClient.delete_indices", AsyncMock(return_value=None), ) as mocked_method: result = runner.invoke(cli, ["index", "delete", index_name]) @@ -512,7 +512,7 @@ def test_delete_index_error(): runner = CliRunner() index_name = "test_index" with patch( - "connectors.es.client.ESManagementClient.delete_indices", + "connectors.es.management_client.ESManagementClient.delete_indices", side_effect=ApiError(500, meta="meta", body="error"), ): result = runner.invoke(cli, ["index", "delete", index_name]) diff --git a/tests/test_kibana.py b/tests/test_kibana.py index c87a40ef6..415903599 100644 --- a/tests/test_kibana.py +++ b/tests/test_kibana.py @@ -8,7 +8,7 @@ import pytest -from connectors.es.client import ESManagementClient +from connectors.es.management_client import ESManagementClient from connectors.kibana import main, upsert_index HERE = os.path.dirname(__file__) diff --git a/tests/test_sink.py b/tests/test_sink.py index 4465cbf62..10f5aaa86 100644 --- a/tests/test_sink.py +++ b/tests/test_sink.py @@ -14,7 +14,7 @@ from elasticsearch import BadRequestError from connectors.es import Mappings -from connectors.es.client import ESManagementClient +from connectors.es.management_client import ESManagementClient from connectors.es.sink import ( OP_DELETE, OP_INDEX, @@ -625,7 +625,9 @@ async def setup_extractor( ), ], ) -@mock.patch("connectors.es.client.ESManagementClient.yield_existing_documents_metadata") +@mock.patch( + "connectors.es.management_client.ESManagementClient.yield_existing_documents_metadata" +) @pytest.mark.asyncio async def test_get_docs( yield_existing_documents_metadata, @@ -919,7 +921,9 @@ async def test_get_docs_incrementally( ), ], ) -@mock.patch("connectors.es.client.ESManagementClient.yield_existing_documents_metadata") +@mock.patch( + "connectors.es.management_client.ESManagementClient.yield_existing_documents_metadata" +) @pytest.mark.asyncio async def test_get_access_control_docs( yield_existing_documents_metadata, From 69343c79098c77f0d1882eac5d7d7d3baa1df70d Mon Sep 17 00:00:00 2001 From: Artem Shelkovnikov Date: Thu, 28 Dec 2023 11:27:18 +0100 Subject: [PATCH 3/5] Add more tests, fix some obvious bugs --- connectors/es/management_client.py | 35 +++-- tests/es/test_management_client.py | 229 +++++++++++++++++++++++++---- tests/test_sink.py | 5 + 3 files changed, 229 insertions(+), 40 deletions(-) diff --git a/connectors/es/management_client.py b/connectors/es/management_client.py index ed4c42dfa..0a7201f14 100644 --- a/connectors/es/management_client.py +++ b/connectors/es/management_client.py @@ -64,18 +64,23 @@ async def ensure_content_index_mappings(self, index, mappings): ) existing_mappings = response[index].get("mappings", {}) - if len(existing_mappings) == 0 and mappings: - logger.debug( - "Index %s has no mappings or it's empty. Adding mappings...", index - ) - await self.client.indices.put_mapping( - index=index, - dynamic=mappings.get("dynamic", False), - dynamic_templates=mappings.get("dynamic_templates", []), - properties=mappings.get("properties", {}), - expand_wildcards=expand_wildcards, - ) - logger.debug("Successfully added mappings for index %s", index) + if len(existing_mappings) == 0: + if mappings: + logger.debug( + "Index %s has no mappings or it's empty. Adding mappings...", index + ) + await self.client.indices.put_mapping( + index=index, + dynamic=mappings.get("dynamic", False), + dynamic_templates=mappings.get("dynamic_templates", []), + properties=mappings.get("properties", {}), + expand_wildcards=expand_wildcards, + ) + logger.debug("Successfully added mappings for index %s", index) + else: + logger.debug( + "Index %s has no mappings but no mappings are provided, skipping mappings creation" + ) else: logger.debug( "Index %s already has mappings, skipping mappings creation", index @@ -90,8 +95,8 @@ async def ensure_ingest_pipeline_exists( await self.client.ingest.put_pipeline( id=pipeline_id, version=version, - description=version, - processors=version, + description=description, + processors=processors, ) async def delete_indices(self, indices, expand_wildcards="open"): @@ -130,7 +135,7 @@ async def yield_existing_documents_metadata(self, index): 300,000 ids will be around 50MiB """ logger.debug(f"Scanning existing index {index}") - if not self.index_exists(index): + if not await self.index_exists(index): return async for doc in async_scan( diff --git a/tests/es/test_management_client.py b/tests/es/test_management_client.py index be4524719..b8bf03c47 100644 --- a/tests/es/test_management_client.py +++ b/tests/es/test_management_client.py @@ -5,42 +5,159 @@ # from datetime import datetime from unittest import mock -from unittest.mock import AsyncMock, Mock +from unittest.mock import ANY, AsyncMock import pytest +import pytest_asyncio +from elasticsearch import ( + NotFoundError as ElasticNotFoundError, +) from connectors.es.management_client import ESManagementClient from tests.commons import AsyncIterator class TestESManagementClient: - @pytest.mark.asyncio - async def test_index_exists(self): + @pytest_asyncio.fixture + def es_management_client(self): config = { "username": "elastic", "password": "changeme", "host": "http://nowhere.com:9200", } - index_name = "search-mongo" es_management_client = ESManagementClient(config) - es_management_client.client = Mock() - es_management_client.client.indices.exists = AsyncMock() - await es_management_client.index_exists(index_name=index_name) - es_management_client.client.indices.exists.assert_awaited_with( + es_management_client.client = AsyncMock() + + yield es_management_client + + @pytest.mark.asyncio + async def test_ensure_exists_when_no_indices_passed(self, es_management_client): + await es_management_client.ensure_exists() + + es_management_client.client.indices.exists.assert_not_called() + + @pytest.mark.asyncio + async def test_ensure_exists_when_indices_passed(self, es_management_client): + index_name = "search-mongo" + es_management_client.client.indices.exists.return_value = False + + await es_management_client.ensure_exists([index_name]) + + es_management_client.client.indices.exists.assert_called_with( index=index_name, expand_wildcards="open" ) + es_management_client.client.indices.create.assert_called_with(index=index_name) @pytest.mark.asyncio - async def test_delete_indices(self): - config = { - "username": "elastic", - "password": "changeme", - "host": "http://nowhere.com:9200", + async def test_create_content_index(self, es_management_client): + index_name = "search-mongo" + lang_code = "en" + await es_management_client.create_content_index(index_name, lang_code) + + es_management_client.client.indices.create.assert_called_with( + index=index_name, mappings=ANY, settings=ANY + ) + + @pytest.mark.asyncio + async def test_ensure_content_index_mappings_when_mappings_exist( + self, es_management_client + ): + index_name = "something" + mappings = {} + existing_mappings_response = {index_name: {"mappings": ["something"]}} + + es_management_client.client.indices.get_mapping = AsyncMock( + return_value=existing_mappings_response + ) + + await es_management_client.ensure_content_index_mappings(index_name, mappings) + es_management_client.client.indices.put_mapping.assert_not_called() + + @pytest.mark.asyncio + async def test_ensure_content_index_mappings_when_mappings_do_not_exist( + self, es_management_client + ): + index_name = "something" + mappings = { + "dynamic": True, + "dynamic_templates": ["something"], + "properties": "something_else", } + existing_mappings_response = {index_name: {"mappings": {}}} + + es_management_client.client.indices.get_mapping = AsyncMock( + return_value=existing_mappings_response + ) + + await es_management_client.ensure_content_index_mappings(index_name, mappings) + es_management_client.client.indices.put_mapping.assert_awaited_with( + index=index_name, + dynamic=mappings["dynamic"], + dynamic_templates=mappings["dynamic_templates"], + properties=mappings["properties"], + expand_wildcards=ANY, + ) + + @pytest.mark.asyncio + async def test_ensure_content_index_mappings_when_mappings_do_not_exist_but_no_mappings_provided( + self, es_management_client + ): + index_name = "something" + mappings = None + existing_mappings_response = {index_name: {"mappings": {}}} + + es_management_client.client.indices.get_mapping = AsyncMock( + return_value=existing_mappings_response + ) + + await es_management_client.ensure_content_index_mappings(index_name, mappings) + es_management_client.client.indices.put_mapping.assert_not_called() + + @pytest.mark.asyncio + async def test_ensure_ingest_pipeline_exists_when_pipeline_do_not_exist( + self, es_management_client + ): + pipeline_id = 1 + version = 2 + description = "that's a pipeline" + processors = ["something"] + + es_management_client.client.ingest.get_pipeline.side_effect = ( + ElasticNotFoundError("1", "2", "3") + ) + + await es_management_client.ensure_ingest_pipeline_exists( + pipeline_id, version, description, processors + ) + + es_management_client.client.ingest.put_pipeline.assert_awaited_with( + id=pipeline_id, + version=version, + description=description, + processors=processors, + ) + + @pytest.mark.asyncio + async def test_ensure_ingest_pipeline_exists_when_pipeline_exists( + self, es_management_client + ): + pipeline_id = 1 + version = 2 + description = "that's a pipeline" + processors = ["something"] + + es_management_client.client.ingest.get_pipeline = AsyncMock() + + await es_management_client.ensure_ingest_pipeline_exists( + pipeline_id, version, description, processors + ) + + es_management_client.client.ingest.put_pipeline.assert_not_called() + + @pytest.mark.asyncio + async def test_delete_indices(self, es_management_client): indices = ["search-mongo"] - es_management_client = ESManagementClient(config) - es_management_client.client = Mock() es_management_client.client.indices.delete = AsyncMock() await es_management_client.delete_indices(indices=indices) @@ -49,15 +166,78 @@ async def test_delete_indices(self): ) @pytest.mark.asyncio - async def test_yield_existing_documents_metadata(self, mock_responses): - config = { - "username": "elastic", - "password": "changeme", - "host": "http://nowhere.com:9200", - } - es_management_client = ESManagementClient(config) - es_management_client.client = AsyncMock() - es_management_client.client.index_exists = Mock(return_value=True) + async def test_index_exists(self, es_management_client): + index_name = "search-mongo" + es_management_client.client.indices.exists = AsyncMock() + + await es_management_client.index_exists(index_name=index_name) + es_management_client.client.indices.exists.assert_awaited_with( + index=index_name, expand_wildcards="open" + ) + + @pytest.mark.asyncio + async def test_clean_index(self, es_management_client): + index_name = "search-mongo" + es_management_client.client.indices.exists = AsyncMock() + + await es_management_client.clean_index(index_name=index_name) + es_management_client.client.delete_by_query.assert_awaited_with( + index=index_name, body=ANY, ignore_unavailable=ANY + ) + + @pytest.mark.asyncio + async def test_list_indices(self, es_management_client): + await es_management_client.list_indices() + + es_management_client.client.indices.stats.assert_awaited_with(index="search-*") + + @pytest.mark.asyncio + async def test_index_exists(self, es_management_client): + index_name = "search-mongo" + es_management_client.client.indices.exists = AsyncMock(return_value=True) + + assert await es_management_client.index_exists(index_name=index_name) is True + + @pytest.mark.asyncio + async def test_upsert(self, es_management_client): + _id = "123" + index_name = "search-mongo" + document = {"something": "something"} + + await es_management_client.upsert(_id, index_name, document) + + assert await es_management_client.client.index( + id=_id, index=index_name, doc=document + ) + + @pytest.mark.asyncio + async def test_yield_existing_documents_metadata_when_index_does_not_exist( + self, es_management_client, mock_responses + ): + es_management_client.index_exists = AsyncMock(return_value=False) + + records = [ + {"_id": "1", "_source": {"_timestamp": str(datetime.now())}}, + {"_id": "2", "_source": {"_timestamp": str(datetime.now())}}, + ] + + with mock.patch( + "connectors.es.management_client.async_scan", + return_value=AsyncIterator(records), + ): + ids = [] + async for doc_id, _ in es_management_client.yield_existing_documents_metadata( + "something" + ): + ids.append(doc_id) + + assert ids == [] + + @pytest.mark.asyncio + async def test_yield_existing_documents_metadata_when_index_exists( + self, es_management_client, mock_responses + ): + es_management_client.index_exists = AsyncMock(return_value=True) records = [ {"_id": "1", "_source": {"_timestamp": str(datetime.now())}}, @@ -75,4 +255,3 @@ async def test_yield_existing_documents_metadata(self, mock_responses): ids.append(doc_id) assert ids == ["1", "2"] - await es_management_client.close() diff --git a/tests/test_sink.py b/tests/test_sink.py index 10f5aaa86..f3d8e9830 100644 --- a/tests/test_sink.py +++ b/tests/test_sink.py @@ -158,6 +158,11 @@ def set_responses(mock_responses, ts=None): if ts is None: ts = datetime.datetime.now().isoformat() headers = {"X-Elastic-Product": "Elasticsearch"} + mock_responses.head( + "http://nowhere.com:9200/search-some-index?expand_wildcards=open", + headers=headers, + ) + mock_responses.get( "http://nowhere.com:9200/search-some-index", payload={"_id": "1"}, From 9e7e7a86ea8e0617f339e186e9f91a6c88f9ff6d Mon Sep 17 00:00:00 2001 From: Artem Shelkovnikov Date: Wed, 3 Jan 2024 15:31:50 +0100 Subject: [PATCH 4/5] Remove unnecessary expand_wildcard option --- connectors/es/management_client.py | 28 ++++++++-------------------- connectors/kibana.py | 9 ++------- connectors/preflight_check.py | 1 - tests/es/test_management_client.py | 11 +++-------- tests/test_kibana.py | 12 ++++-------- tests/test_preflight_check.py | 4 +--- tests/test_sink.py | 12 ++++++------ 7 files changed, 24 insertions(+), 53 deletions(-) diff --git a/connectors/es/management_client.py b/connectors/es/management_client.py index 0a7201f14..496ad8709 100644 --- a/connectors/es/management_client.py +++ b/connectors/es/management_client.py @@ -33,17 +33,13 @@ def __init__(self, config): # initialize ESIndex instance super().__init__(config) - async def ensure_exists(self, indices=None, expand_wildcards="open"): + async def ensure_exists(self, indices=None): if indices is None: indices = [] for index in indices: - logger.debug( - f"Checking index {index} with expand_wildcards={expand_wildcards}" - ) - if not await self.client.indices.exists( - index=index, expand_wildcards=expand_wildcards - ): + logger.debug(f"Checking index {index}") + if not await self.client.indices.exists(index=index): await self.client.indices.create(index=index) logger.debug(f"Created index {index}") @@ -58,10 +54,7 @@ async def create_content_index(self, search_index_name, language_code): async def ensure_content_index_mappings(self, index, mappings): # open = Match open, non-hidden indices. Also matches any non-hidden data stream. # Content indices are always non-hidden. - expand_wildcards = "open" - response = await self.client.indices.get_mapping( - index=index, expand_wildcards=expand_wildcards - ) + response = await self.client.indices.get_mapping(index=index) existing_mappings = response[index].get("mappings", {}) if len(existing_mappings) == 0: @@ -74,7 +67,6 @@ async def ensure_content_index_mappings(self, index, mappings): dynamic=mappings.get("dynamic", False), dynamic_templates=mappings.get("dynamic_templates", []), properties=mappings.get("properties", {}), - expand_wildcards=expand_wildcards, ) logger.debug("Successfully added mappings for index %s", index) else: @@ -99,10 +91,8 @@ async def ensure_ingest_pipeline_exists( processors=processors, ) - async def delete_indices(self, indices, expand_wildcards="open"): - await self.client.indices.delete( - index=indices, expand_wildcards=expand_wildcards, ignore_unavailable=True - ) + async def delete_indices(self, indices): + await self.client.indices.delete(index=indices, ignore_unavailable=True) async def clean_index(self, index_name): return await self.client.delete_by_query( @@ -112,10 +102,8 @@ async def clean_index(self, index_name): async def list_indices(self): return await self.client.indices.stats(index="search-*") - async def index_exists(self, index_name, expand_wildcards="open"): - return await self.client.indices.exists( - index=index_name, expand_wildcards=expand_wildcards - ) + async def index_exists(self, index_name): + return await self.client.indices.exists(index=index_name) async def upsert(self, _id, index_name, doc): await self.client.index( diff --git a/connectors/kibana.py b/connectors/kibana.py index 8302a3c07..7647d63e4 100644 --- a/connectors/kibana.py +++ b/connectors/kibana.py @@ -177,17 +177,12 @@ async def upsert_index(es, index): this logic. """ - if index.startswith("."): - expand_wildcards = "hidden" - else: - expand_wildcards = "open" - - exists = await es.index_exists(index, expand_wildcards) + exists = await es.index_exists(index) if exists: logger.debug(f"{index} exists, deleting...") logger.debug("Deleting it first") - await es.delete_indices([index], expand_wildcards) + await es.delete_indices([index]) logger.debug(f"Creating index {index}") await es.create_content_index(index, DEFAULT_LANGUAGE) diff --git a/connectors/preflight_check.py b/connectors/preflight_check.py index dc556c5bd..b43293d4e 100644 --- a/connectors/preflight_check.py +++ b/connectors/preflight_check.py @@ -98,7 +98,6 @@ async def _check_system_indices_with_retries(self): await self.es_management_client.ensure_exists( indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX], - expand_wildcards="all", ) return True except Exception as e: diff --git a/tests/es/test_management_client.py b/tests/es/test_management_client.py index b8bf03c47..ce9849979 100644 --- a/tests/es/test_management_client.py +++ b/tests/es/test_management_client.py @@ -44,9 +44,7 @@ async def test_ensure_exists_when_indices_passed(self, es_management_client): await es_management_client.ensure_exists([index_name]) - es_management_client.client.indices.exists.assert_called_with( - index=index_name, expand_wildcards="open" - ) + es_management_client.client.indices.exists.assert_called_with(index=index_name) es_management_client.client.indices.create.assert_called_with(index=index_name) @pytest.mark.asyncio @@ -96,7 +94,6 @@ async def test_ensure_content_index_mappings_when_mappings_do_not_exist( dynamic=mappings["dynamic"], dynamic_templates=mappings["dynamic_templates"], properties=mappings["properties"], - expand_wildcards=ANY, ) @pytest.mark.asyncio @@ -162,7 +159,7 @@ async def test_delete_indices(self, es_management_client): await es_management_client.delete_indices(indices=indices) es_management_client.client.indices.delete.assert_awaited_with( - index=indices, expand_wildcards="open", ignore_unavailable=True + index=indices, ignore_unavailable=True ) @pytest.mark.asyncio @@ -171,9 +168,7 @@ async def test_index_exists(self, es_management_client): es_management_client.client.indices.exists = AsyncMock() await es_management_client.index_exists(index_name=index_name) - es_management_client.client.indices.exists.assert_awaited_with( - index=index_name, expand_wildcards="open" - ) + es_management_client.client.indices.exists.assert_awaited_with(index=index_name) @pytest.mark.asyncio async def test_clean_index(self, es_management_client): diff --git a/tests/test_kibana.py b/tests/test_kibana.py index 415903599..d6c9a170d 100644 --- a/tests/test_kibana.py +++ b/tests/test_kibana.py @@ -17,17 +17,13 @@ def mock_index_creation(index, mock_responses, hidden=True): url = f"http://nowhere.com:9200/{index}" - if hidden: - url += "?expand_wildcards=hidden" - else: - url += "?expand_wildcards=open" headers = {"X-Elastic-Product": "Elasticsearch"} mock_responses.head( url, headers=headers, ) mock_responses.delete( - f"{url}&ignore_unavailable=true", + f"{url}?ignore_unavailable=true", headers=headers, ) mock_responses.put( @@ -76,7 +72,7 @@ async def test_upsert_index(mock_responses): "http://nowhere.com:9200/.elastic-connectors/_refresh", headers=headers ) mock_responses.head( - "http://nowhere.com:9200/search-new-index?expand_wildcards=open", + "http://nowhere.com:9200/search-new-index", headers=headers, status=404, ) @@ -91,11 +87,11 @@ async def test_upsert_index(mock_responses): await upsert_index(es, "search-new-index") mock_responses.head( - "http://nowhere.com:9200/search-new-index?expand_wildcards=open", + "http://nowhere.com:9200/search-new-index", headers=headers, ) mock_responses.delete( - "http://nowhere.com:9200/search-new-index?expand_wildcards=open&ignore_unavailable=true", + "http://nowhere.com:9200/search-new-index?ignore_unavailable=true", headers=headers, ) mock_responses.put( diff --git a/tests/test_preflight_check.py b/tests/test_preflight_check.py index d481886ca..7f25d0183 100644 --- a/tests/test_preflight_check.py +++ b/tests/test_preflight_check.py @@ -38,9 +38,7 @@ def mock_es_info(mock_responses, healthy=True, repeat=False): def mock_index_exists(mock_responses, index, exist=True, repeat=False): status = 200 if exist else 404 - mock_responses.head( - f"{host}/{index}?expand_wildcards=all", status=status, repeat=repeat - ) + mock_responses.head(f"{host}/{index}", status=status, repeat=repeat) def mock_index(mock_responses, index, doc_id, repeat=False): diff --git a/tests/test_sink.py b/tests/test_sink.py index f3d8e9830..3b5accc7b 100644 --- a/tests/test_sink.py +++ b/tests/test_sink.py @@ -60,7 +60,7 @@ async def test_prepare_content_index_raise_error_when_index_creation_failed( "http://nowhere.com:9200/.elastic-connectors/_refresh", headers=headers ) mock_responses.head( - f"http://nowhere.com:9200/{index_name}?expand_wildcards=open", + f"http://nowhere.com:9200/{index_name}", headers=headers, status=404, ) @@ -90,7 +90,7 @@ async def test_prepare_content_index_create_index( "http://nowhere.com:9200/.elastic-connectors/_refresh", headers=headers ) mock_responses.head( - f"http://nowhere.com:9200/{index_name}?expand_wildcards=open", + f"http://nowhere.com:9200/{index_name}", headers=headers, status=404, ) @@ -126,16 +126,16 @@ async def test_prepare_content_index(mock_responses): mappings = Mappings.default_text_fields_mappings(is_connectors_index=True) mock_responses.head( - "http://nowhere.com:9200/search-new-index?expand_wildcards=open", + "http://nowhere.com:9200/search-new-index", headers=headers, ) mock_responses.get( - "http://nowhere.com:9200/search-new-index/_mapping?expand_wildcards=open", + "http://nowhere.com:9200/search-new-index/_mapping", headers=headers, payload={"search-new-index": {"mappings": {}}}, ) mock_responses.put( - "http://nowhere.com:9200/search-new-index/_mapping?expand_wildcards=open", + "http://nowhere.com:9200/search-new-index/_mapping", headers=headers, payload=mappings, body='{"acknowledged": True}', @@ -159,7 +159,7 @@ def set_responses(mock_responses, ts=None): ts = datetime.datetime.now().isoformat() headers = {"X-Elastic-Product": "Elasticsearch"} mock_responses.head( - "http://nowhere.com:9200/search-some-index?expand_wildcards=open", + "http://nowhere.com:9200/search-some-index", headers=headers, ) From ef099f69df7689bd325fbca64e7ddf71b393c51d Mon Sep 17 00:00:00 2001 From: Artem Shelkovnikov Date: Wed, 3 Jan 2024 15:50:57 +0100 Subject: [PATCH 5/5] Fix linter stage --- connectors/cli/connector.py | 6 ++---- connectors/cli/job.py | 2 -- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/connectors/cli/connector.py b/connectors/cli/connector.py index 15cb33f38..1a8213a6d 100644 --- a/connectors/cli/connector.py +++ b/connectors/cli/connector.py @@ -30,8 +30,7 @@ async def list_connectors(self): # TODO move this on top try: await self.es_management_client.ensure_exists( - indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX], - expand_wildcards="all", + indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] ) return [ @@ -92,8 +91,7 @@ async def __create_connector( ): try: await self.es_management_client.ensure_exists( - indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX], - expand_wildcards="all", + indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] ) timestamp = iso_utc() diff --git a/connectors/cli/job.py b/connectors/cli/job.py index ee1b713a7..5c5b6dc2b 100644 --- a/connectors/cli/job.py +++ b/connectors/cli/job.py @@ -38,7 +38,6 @@ async def __async_job(self, job_id): try: await self.es_management_client.ensure_exists( indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX], - expand_wildcards="all", ) job = await self.sync_job_index.fetch_by_id(job_id) return job @@ -65,7 +64,6 @@ async def __async_list_jobs(self, connector_id, index_name, job_id): try: await self.es_management_client.ensure_exists( indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX], - expand_wildcards="all", ) jobs = self.sync_job_index.get_all_docs( query=self.__job_list_query(connector_id, index_name, job_id),