From e40f51f366c51636f800903740a21352790b6786 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Thu, 7 Dec 2023 14:17:31 +0100 Subject: [PATCH] Update settings in existing index for statefull es --- connectors/es/sink.py | 65 ++++++++++++++++++++++------------- connectors/sync_job_runner.py | 2 +- 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/connectors/es/sink.py b/connectors/es/sink.py index b54cf977e..b1e7d4f61 100644 --- a/connectors/es/sink.py +++ b/connectors/es/sink.py @@ -631,54 +631,73 @@ def __init__(self, elastic_config, logger_=None): self._sink = None self._sink_task = None - async def prepare_content_index(self, index, language_code=None): + async def prepare_content_index(self, index_name, language_code=None): """Creates the index, given a mapping if it does not exists.""" - self._logger.debug(f"Checking index {index}") + self._logger.debug(f"Checking index {index_name}") - expand_wildcards = "open" - exists = await self.client.indices.exists( - index=index, expand_wildcards=expand_wildcards + result = await self.client.indices.get( + index=index_name, ignore_unavailable=True ) - mappings = Mappings.default_text_fields_mappings(is_connectors_index=True) + index = result.get(index_name, None) - if exists: - # Update the index mappings if needed - self._logger.debug(f"{index} exists") - await self._ensure_content_index_mappings(index, mappings, expand_wildcards) + if index: + self._logger.debug(f"{index_name} exists") + await self._ensure_content_index_settings(index_name, index, language_code) + await self._ensure_content_index_mappings(index_name, index) else: # Create a new index - self._logger.info(f"Creating content index: {index}") + self._logger.info(f"Creating content index: {index_name}") await self._create_content_index( - index=index, language_code=language_code, mappings=mappings + index=index_name, language_code=language_code ) - self._logger.info(f"Content index successfully created: {index}") + self._logger.info(f"Content index successfully created: {index_name}") - async def _ensure_content_index_mappings(self, index, mappings, expand_wildcards): - response = await self.client.indices.get_mapping( - index=index, expand_wildcards=expand_wildcards - ) + async def _ensure_content_index_settings( + self, index_name, index, language_code=None + ): + existing_settings = index.get("settings", {}) + settings = Settings(language_code=language_code, analysis_icu=False).to_hash() + + if "analysis" not in existing_settings and settings: + self._logger.debug( + f"Index {index_name} has no settings or it's empty. Adding settings..." + ) + + await self.client.indices.close(index=index_name) + await self.client.indices.put_settings(index=index_name, body=settings) + await self.client.indices.open(index=index_name) + + self._logger.debug(f"Successfully added settings for index {index_name}") + else: + self._logger.debug( + f"Index {index_name} already has settings, skipping settings creation" + ) + + async def _ensure_content_index_mappings(self, index_name, index): + existing_mappings = index.get("mappings", {}) + mappings = Mappings.default_text_fields_mappings(is_connectors_index=True) - existing_mappings = response[index].get("mappings", {}) if len(existing_mappings) == 0 and mappings: self._logger.debug( - "Index %s has no mappings or it's empty. Adding mappings...", index + "Index %s has no mappings or it's empty. Adding mappings...", + index_name, ) await self.client.indices.put_mapping( - index=index, + index=index_name, dynamic=mappings.get("dynamic", False), dynamic_templates=mappings.get("dynamic_templates", []), properties=mappings.get("properties", {}), - expand_wildcards=expand_wildcards, ) - self._logger.debug("Successfully added mappings for index %s", index) + self._logger.debug(f"Successfully added mappings for index {index_name}") else: self._logger.debug( - "Index %s already has mappings, skipping mappings creation", index + "Index {index_name} already has mappings, skipping mappings creation" ) async def _create_content_index(self, index, mappings, language_code=None): settings = Settings(language_code=language_code, analysis_icu=False).to_hash() + mappings = Mappings.default_text_fields_mappings(is_connectors_index=True) return await self.client.indices.create( index=index, mappings=mappings, settings=settings diff --git a/connectors/sync_job_runner.py b/connectors/sync_job_runner.py index 4266204be..ece3204f8 100644 --- a/connectors/sync_job_runner.py +++ b/connectors/sync_job_runner.py @@ -197,7 +197,7 @@ async def _execute_content_sync_job(self, job_type, bulk_options): logger.debug("Preparing the content index") await self.elastic_server.prepare_content_index( - index=self.sync_job.index_name, language_code=self.sync_job.language + index_name=self.sync_job.index_name, language_code=self.sync_job.language ) content_extraction_enabled = (