From af87c164621da97adbe22c73780ba8e229b90f07 Mon Sep 17 00:00:00 2001 From: Artem Shelkovnikov Date: Mon, 8 Jan 2024 14:58:40 +0100 Subject: [PATCH 1/6] Make bin/connectors index without arguments output help (#2028) --- connectors/connectors_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 901b7253d..5e90d33bd 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -336,7 +336,7 @@ def prompt(): # Index group -@click.group(invoke_without_command=True, help="Search indices management") +@click.group(invoke_without_command=False, help="Search indices management") @click.pass_obj def index(obj): pass From b8fb143658746f65a0a0f413b8bce82c9fd8c5a3 Mon Sep 17 00:00:00 2001 From: Artem Shelkovnikov Date: Mon, 8 Jan 2024 15:05:05 +0100 Subject: [PATCH 2/6] Make clean_index method proceed on conflicts (#2029) --- connectors/es/index.py | 1 + 1 file changed, 1 insertion(+) diff --git a/connectors/es/index.py b/connectors/es/index.py index f379840b7..d0512ad1a 100644 --- a/connectors/es/index.py +++ b/connectors/es/index.py @@ -72,6 +72,7 @@ async def clean_index(self): index=self.index_name, body={"query": {"match_all": {}}}, ignore_unavailable=True, + conflicts="proceed", ) async def update(self, doc_id, doc, if_seq_no=None, if_primary_term=None): From a9357549d7e6a2025999371c25a08b5ef4da4d75 Mon Sep 17 00:00:00 2001 From: akanshi-elastic <125074214+akanshi-elastic@users.noreply.github.com> Date: Tue, 9 Jan 2024 12:11:20 +0530 Subject: [PATCH 3/6] Added new RCF `bucket` for GCS and ABS (#1982) --- connectors/sources/azure_blob_storage.py | 53 ++++++--- connectors/sources/google_cloud_storage.py | 38 ++++--- .../azure_blob_storage/connector.json | 13 ++- .../google_cloud_storage/connector.json | 33 ++---- tests/sources/test_azure_blob_storage.py | 101 +++++++++++++++++- tests/sources/test_google_cloud_storage.py | 54 +++++++++- 6 files changed, 233 insertions(+), 59 deletions(-) diff --git a/connectors/sources/azure_blob_storage.py b/connectors/sources/azure_blob_storage.py index 99d7f5133..0fc6c67e1 100644 --- a/connectors/sources/azure_blob_storage.py +++ b/connectors/sources/azure_blob_storage.py @@ -38,6 +38,7 @@ def __init__(self, configuration): self.connection_string = None self.retry_count = self.configuration["retry_count"] self.concurrent_downloads = self.configuration["concurrent_downloads"] + self.containers = self.configuration["containers"] def tweak_bulk_options(self, options): """Tweak bulk options as per concurrent downloads support by azure blob storage @@ -73,11 +74,17 @@ def get_default_configuration(cls): "order": 3, "type": "str", }, + "containers": { + "display": "textarea", + "label": "Azure Blob Storage containers", + "order": 4, + "type": "list", + }, "retry_count": { "default_value": DEFAULT_RETRY_COUNT, "display": "numeric", "label": "Retries per request", - "order": 4, + "order": 5, "required": False, "type": "int", "ui_restrictions": ["advanced"], @@ -86,7 +93,7 @@ def get_default_configuration(cls): "default_value": MAX_CONCURRENT_DOWNLOADS, "display": "numeric", "label": "Maximum concurrent downloads", - "order": 5, + "order": 6, "required": False, "type": "int", "ui_restrictions": ["advanced"], @@ -97,7 +104,7 @@ def get_default_configuration(cls): "use_text_extraction_service": { "display": "toggle", "label": "Use text extraction service", - "order": 6, + "order": 7, "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", "type": "bool", "ui_restrictions": ["advanced"], @@ -196,22 +203,39 @@ async def blob_download_func(self, blob_name, container_name): async for content in data.chunks(): yield content - async def get_container(self): + async def get_container(self, container_list): """Get containers from Azure Blob Storage via azure base client + Args: + container_list (list): List of containers Yields: dictionary: Container document with name & meta data """ + container_set = set(container_list) async with BlobServiceClient.from_connection_string( conn_str=self.connection_string, retry_total=self.retry_count ) as azure_base_client: - async for container in azure_base_client.list_containers( - include_metadata=True - ): - yield { - "name": container["name"], - "metadata": container["metadata"], - } + try: + async for container in azure_base_client.list_containers( + include_metadata=True + ): + if "*" in container_set or container["name"] in container_set: + yield { + "name": container["name"], + "metadata": container["metadata"], + } + if "*" not in container_set and container["name"] in container_set: + container_set.remove(container["name"]) + if not container_set: + return + if container_set and "*" not in container_set: + self._logger.warning( + f"Container(s) {','.join(container_set)} are configured but not found." + ) + except Exception as exception: + self._logger.warning( + f"Something went wrong while fetching containers. Error: {exception}" + ) async def get_blob(self, container): """Get blobs for a specific container from Azure Blob Storage via container client @@ -243,6 +267,7 @@ async def get_docs(self, filtering=None): Yields: dictionary: Documents from Azure Blob Storage """ - async for container in self.get_container(): - async for blob in self.get_blob(container=container): - yield blob, partial(self.get_content, blob) + async for container_data in self.get_container(container_list=self.containers): + if container_data: + async for blob in self.get_blob(container=container_data): + yield blob, partial(self.get_content, blob) diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index 498c4b7c4..23087bd0a 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -172,11 +172,17 @@ def get_default_configuration(cls): """ return { + "buckets": { + "display": "textarea", + "label": "Google Cloud Storage buckets", + "order": 1, + "type": "list", + }, "service_account_credentials": { "display": "textarea", "label": "Google Cloud service account JSON", "sensitive": True, - "order": 1, + "order": 2, "type": "str", }, "use_text_extraction_service": { @@ -251,20 +257,26 @@ async def ping(self): ) raise - async def fetch_buckets(self): + async def fetch_buckets(self, buckets): """Fetch the buckets from the Google Cloud Storage. + Args: + buckets (List): List of buckets. Yields: Dictionary: Contains the list of fetched buckets from Google Cloud Storage. """ - async for bucket in self._google_storage_client.api_call( - resource="buckets", - method="list", - full_response=True, - project=self._google_storage_client.user_project_id, - userProject=self._google_storage_client.user_project_id, - ): - yield bucket + if "*" in buckets: + async for bucket in self._google_storage_client.api_call( + resource="buckets", + method="list", + full_response=True, + project=self._google_storage_client.user_project_id, + userProject=self._google_storage_client.user_project_id, + ): + yield bucket + else: + for bucket in buckets: + yield {"items": [{"id": bucket, "name": bucket}]} async def fetch_blobs(self, buckets): """Fetches blobs stored in the bucket from Google Cloud Storage. @@ -378,11 +390,9 @@ async def get_docs(self, filtering=None): Yields: dictionary: Documents from Google Cloud Storage. """ - async for buckets in self.fetch_buckets(): - if not buckets.get("items"): - continue + async for bucket in self.fetch_buckets(self.configuration["buckets"]): async for blobs in self.fetch_blobs( - buckets=buckets, + buckets=bucket, ): for blob_document in self.get_blob_document(blobs=blobs): yield blob_document, partial(self.get_content, blob_document) diff --git a/tests/sources/fixtures/azure_blob_storage/connector.json b/tests/sources/fixtures/azure_blob_storage/connector.json index 19d3d7d5e..f3f32cc93 100644 --- a/tests/sources/fixtures/azure_blob_storage/connector.json +++ b/tests/sources/fixtures/azure_blob_storage/connector.json @@ -34,11 +34,18 @@ "type": "str", "value": "http://127.0.0.1:10000/devstoreaccount1" }, + "containers": { + "display": "textarea", + "label": "Azure Blob Storage containers", + "order": 4, + "type": "list", + "value": "*" + }, "retry_count": { "default_value": 3, "display": "numeric", "label": "Retries per request", - "order": 4, + "order": 5, "required": false, "type": "int", "ui_restrictions": ["advanced"], @@ -48,7 +55,7 @@ "default_value": 100, "display": "numeric", "label": "Maximum concurrent downloads", - "order": 5, + "order": 6, "required": false, "type": "int", "ui_restrictions": ["advanced"], @@ -63,7 +70,7 @@ "display": "toggle", "label": "Use text extraction service", "options": [], - "order": 6, + "order": 7, "required": true, "sensitive": false, "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", diff --git a/tests/sources/fixtures/google_cloud_storage/connector.json b/tests/sources/fixtures/google_cloud_storage/connector.json index bd7c6295b..74bde35fb 100644 --- a/tests/sources/fixtures/google_cloud_storage/connector.json +++ b/tests/sources/fixtures/google_cloud_storage/connector.json @@ -1,6 +1,13 @@ { "api_key_id": null, "configuration": { + "buckets": { + "display": "textarea", + "label": "Google Cloud Storage buckets", + "order": 1, + "type": "list", + "value": "*" + }, "service_account_credentials": { "depends_on": [], "display": "textarea", @@ -13,30 +20,8 @@ "options": [], "validations": [], "value": "{\"type\": \"service_account\", \"project_id\": \"dummy_project_id\", \"private_key_id\": \"abc\", \"private_key\": \"\\n-----BEGIN PRIVATE KEY-----\\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDY3E8o1NEFcjMM\\nHW/5ZfFJw29/8NEqpViNjQIx95Xx5KDtJ+nWn9+OW0uqsSqKlKGhAdAo+Q6bjx2c\\nuXVsXTu7XrZUY5Kltvj94DvUa1wjNXs606r/RxWTJ58bfdC+gLLxBfGnB6CwK0YQ\\nxnfpjNbkUfVVzO0MQD7UP0Hl5ZcY0Puvxd/yHuONQn/rIAieTHH1pqgW+zrH/y3c\\n59IGThC9PPtugI9ea8RSnVj3PWz1bX2UkCDpy9IRh9LzJLaYYX9RUd7++dULUlat\\nAaXBh1U6emUDzhrIsgApjDVtimOPbmQWmX1S60mqQikRpVYZ8u+NDD+LNw+/Eovn\\nxCj2Y3z1AgMBAAECggEAWDBzoqO1IvVXjBA2lqId10T6hXmN3j1ifyH+aAqK+FVl\\nGjyWjDj0xWQcJ9ync7bQ6fSeTeNGzP0M6kzDU1+w6FgyZqwdmXWI2VmEizRjwk+/\\n/uLQUcL7I55Dxn7KUoZs/rZPmQDxmGLoue60Gg6z3yLzVcKiDc7cnhzhdBgDc8vd\\nQorNAlqGPRnm3EqKQ6VQp6fyQmCAxrr45kspRXNLddat3AMsuqImDkqGKBmF3Q1y\\nxWGe81LphUiRqvqbyUlh6cdSZ8pLBpc9m0c3qWPKs9paqBIvgUPlvOZMqec6x4S6\\nChbdkkTRLnbsRr0Yg/nDeEPlkhRBhasXpxpMUBgPywKBgQDs2axNkFjbU94uXvd5\\nznUhDVxPFBuxyUHtsJNqW4p/ujLNimGet5E/YthCnQeC2P3Ym7c3fiz68amM6hiA\\nOnW7HYPZ+jKFnefpAtjyOOs46AkftEg07T9XjwWNPt8+8l0DYawPoJgbM5iE0L2O\\nx8TU1Vs4mXc+ql9F90GzI0x3VwKBgQDqZOOqWw3hTnNT07Ixqnmd3dugV9S7eW6o\\nU9OoUgJB4rYTpG+yFqNqbRT8bkx37iKBMEReppqonOqGm4wtuRR6LSLlgcIU9Iwx\\nyfH12UWqVmFSHsgZFqM/cK3wGev38h1WBIOx3/djKn7BdlKVh8kWyx6uC8bmV+E6\\nOoK0vJD6kwKBgHAySOnROBZlqzkiKW8c+uU2VATtzJSydrWm0J4wUPJifNBa/hVW\\ndcqmAzXC9xznt5AVa3wxHBOfyKaE+ig8CSsjNyNZ3vbmr0X04FoV1m91k2TeXNod\\njMTobkPThaNm4eLJMN2SQJuaHGTGERWC0l3T18t+/zrDMDCPiSLX1NAvAoGBAN1T\\nVLJYdjvIMxf1bm59VYcepbK7HLHFkRq6xMJMZbtG0ryraZjUzYvB4q4VjHk2UDiC\\nlhx13tXWDZH7MJtABzjyg+AI7XWSEQs2cBXACos0M4Myc6lU+eL+iA+OuoUOhmrh\\nqmT8YYGu76/IBWUSqWuvcpHPpwl7871i4Ga/I3qnAoGBANNkKAcMoeAbJQK7a/Rn\\nwPEJB+dPgNDIaboAsh1nZhVhN5cvdvCWuEYgOGCPQLYQF0zmTLcM+sVxOYgfy8mV\\nfbNgPgsP5xmu6dw2COBKdtozw0HrWSRjACd1N4yGu75+wPCcX/gQarcjRcXXZeEa\\nNtBLSfcqPULqD+h7br9lEJio\\n-----END PRIVATE KEY-----\\n\", \"client_email\": \"123-abc@developer.gserviceaccount.com\", \"client_id\": \"123-abc.apps.googleusercontent.com\", \"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\", \"token_uri\": \"http://localhost:4444/token\"}", - "order": 1, - "ui_restrictions": [] - }, - "retry_count": { - "depends_on": [], - "display": "numeric", - "tooltip": "", - "default_value": 3, - "label": "Maximum retries for failed requests", - "sensitive": false, - "type": "int", - "required": false, - "options": [], - "validations": [ - { - "constraint": 0, - "type": "greater_than" - } - ], - "value": null, "order": 2, - "ui_restrictions": [ - "advanced" - ] + "ui_restrictions": [] }, "use_text_extraction_service": { "default_value": null, @@ -44,7 +29,7 @@ "display": "toggle", "label": "Use text extraction service", "options": [], - "order": 7, + "order": 3, "required": true, "sensitive": false, "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", diff --git a/tests/sources/test_azure_blob_storage.py b/tests/sources/test_azure_blob_storage.py index bee23b15e..aa64dc5b1 100644 --- a/tests/sources/test_azure_blob_storage.py +++ b/tests/sources/test_azure_blob_storage.py @@ -139,12 +139,17 @@ async def test_get_container(): with patch.object( BlobServiceClient, "list_containers", return_value=mock_repsonse ): - expected_output = {"name": "container1", "metadata": {"key1": "value1"}} + expected_output = [ + {"name": "container1", "metadata": {"key1": "value1"}}, + None, + ] # Execute - async for actual_document in source.get_container(): + async for actual_document in source.get_container( + container_list=["container1"] + ): # Assert - assert actual_document == expected_output + assert actual_document in expected_output @pytest.mark.asyncio @@ -208,12 +213,25 @@ async def test_get_blob_negative(): assert actual_document is None +@pytest.mark.asyncio +async def test_get_containr_negative(): + """Test get_container negative method of AzureBlobStorageDataSource Class""" + + async with create_abs_source() as source: + source.connection_string = source._configure_connection_string() + async for actual_document in source.get_container( + container_list=["container1"] + ): + assert actual_document is None + + @pytest.mark.asyncio async def test_get_doc(): """Test get_doc method of AzureBlobStorageDataSource Class""" # Setup async with create_abs_source() as source: + source.containers = ["*"] source.get_container = Mock( return_value=AsyncIterator( [ @@ -275,6 +293,83 @@ async def test_get_doc(): assert actual_response == expected_response +async def create_fake_coroutine(item): + """create a method for returning fake coroutine value for + Args: + item: Value for converting into coroutine + """ + return item + + +@pytest.mark.asyncio +async def test_get_doc_for_specific_container(): + """Test get_doc for specific container method of AzureBlobStorageDataSource Class""" + + # Setup + async with create_abs_source() as source: + source.containers = ["container1"] + source.get_blob = Mock( + return_value=AsyncIterator( + [ + { + "type": "blob", + "_id": "container1/blob1", + "timestamp": "2022-04-21T12:12:30", + "created at": "2022-04-21T12:12:30", + "content type": "plain/text", + "container metadata": "{'key1': 'value1'}", + "metadata": "{'key1': 'value1', 'key2': 'value2'}", + "leasedata": "{'status': 'Locked', 'state': 'Leased', 'duration': 'Infinite'}", + "title": "blob1", + "tier": "private", + "size": 1000, + "container": "container1", + } + ] + ) + ) + + source.get_container = Mock( + return_value=AsyncIterator( + [ + { + "type": "container", + "_id": "container1", + "timestamp": "2022-04-21T12:12:30", + "metadata": "key1=value1", + "leasedata": "{'status': 'Locked', 'state': 'Leased', 'duration': 'Infinite'}", + "title": "container1", + "access": "private", + } + ] + ) + ) + expected_response = [ + { + "type": "blob", + "_id": "container1/blob1", + "timestamp": "2022-04-21T12:12:30", + "created at": "2022-04-21T12:12:30", + "content type": "plain/text", + "container metadata": "{'key1': 'value1'}", + "metadata": "{'key1': 'value1', 'key2': 'value2'}", + "leasedata": "{'status': 'Locked', 'state': 'Leased', 'duration': 'Infinite'}", + "title": "blob1", + "tier": "private", + "size": 1000, + "container": "container1", + } + ] + actual_response = [] + + # Execute + async for document, _ in source.get_docs(): + actual_response.append(document) + + # Assert + assert actual_response == expected_response + + @pytest.mark.asyncio async def test_get_content(): """Test get_content method of AzureBlobStorageDataSource Class""" diff --git a/tests/sources/test_google_cloud_storage.py b/tests/sources/test_google_cloud_storage.py index 1f496a9c4..6c98f4eae 100644 --- a/tests/sources/test_google_cloud_storage.py +++ b/tests/sources/test_google_cloud_storage.py @@ -192,7 +192,7 @@ async def test_fetch_buckets(): ): with mock.patch.object(ServiceAccountManager, "refresh"): bucket_list = [] - async for bucket in source.fetch_buckets(): + async for bucket in source.fetch_buckets(["*"]): bucket_list.append(bucket) assert bucket_list == expected_bucket_list @@ -275,6 +275,58 @@ async def test_get_docs(): """Tests the module responsible to fetch and yield blobs documents from Google Cloud Storage.""" async with create_gcs_source() as source: + source.configuration.get_field("buckets").value = ["*"] + expected_response = { + "kind": "storage#objects", + "items": [ + { + "kind": "storage#object", + "id": "bucket_1/blob_1/123123123", + "updated": "2011-10-12T00:01:00Z", + "name": "blob_1", + } + ], + } + expected_blob_document = { + "_id": "bucket_1/blob_1/123123123", + "component_count": None, + "content_encoding": None, + "content_language": None, + "created_at": None, + "last_updated": "2011-10-12T00:01:00Z", + "metadata": None, + "name": "blob_1", + "size": None, + "storage_class": None, + "_timestamp": "2011-10-12T00:01:00Z", + "type": None, + "url": "https://console.cloud.google.com/storage/browser/_details/None/blob_1;tab=live_object?project=dummy123", + "version": None, + "bucket_name": None, + } + dummy_url = "https://dummy.gcs.com/buckets/b1/objects" + + expected_response_object = Response( + status_code=200, + url=dummy_url, + json=expected_response, + req=Request(method="GET", url=dummy_url), + ) + + with mock.patch.object( + Aiogoogle, "as_service_account", return_value=expected_response_object + ): + with mock.patch.object(ServiceAccountManager, "refresh"): + async for blob_document in source.get_docs(): + assert blob_document[0] == expected_blob_document + + +@pytest.mark.asyncio +async def test_get_docs_with_specific_bucket(): + """Tests the module responsible to fetch and yield blobs documents from Google Cloud Storage.""" + + async with create_gcs_source() as source: + source.configuration.get_field("buckets").value = ["test_bucket"] expected_response = { "kind": "storage#objects", "items": [ From 2ce504e547bde2c1a8d0a26f51fbf436de4cc826 Mon Sep 17 00:00:00 2001 From: parthpuri-elastic <150776158+parthpuri-elastic@users.noreply.github.com> Date: Tue, 9 Jan 2024 14:27:55 +0530 Subject: [PATCH 4/6] [GitHub] Fix scopes related issue (#1997) Co-authored-by: Praveen Kukreja <90465691+praveen-elastic@users.noreply.github.com> --- connectors/sources/github.py | 20 ++++++++++++-- tests/sources/fixtures/github/fixture.py | 2 +- tests/sources/test_github.py | 34 ++++++++++++++++++++++-- 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/connectors/sources/github.py b/connectors/sources/github.py index cb739bcd6..ee1c6c44f 100644 --- a/connectors/sources/github.py +++ b/connectors/sources/github.py @@ -1196,9 +1196,25 @@ async def _remote_validation(self): _, headers = await self.github_client.post( # pyright: ignore query_data=query_data, need_headers=True ) - if "repo" not in headers.get("X-OAuth-Scopes"): # pyright: ignore - msg = "Configured token does not have required rights to fetch the content" + + scopes = headers.get("X-OAuth-Scopes", "") + scopes = {scope.strip() for scope in scopes.split(",")} + required_scopes = {"repo", "user", "read:org"} + + for scope in ["write:org", "admin:org"]: + if scope in scopes: + scopes.add("read:org") + + if required_scopes.issubset(scopes): + extra_scopes = scopes - required_scopes + if extra_scopes: + self._logger.warning( + "The provided token has higher privileges than required. It is advisable to run the connector with least privielged token. Required scopes are 'repo', 'user', and 'read:org'." + ) + else: + msg = "Configured token does not have required rights to fetch the content. Required scopes are 'repo', 'user', and 'read:org'." raise ConfigurableFieldValueError(msg) + if self.github_client.repos != [WILDCARD]: invalid_repos = await self.get_invalid_repos() if invalid_repos: diff --git a/tests/sources/fixtures/github/fixture.py b/tests/sources/fixtures/github/fixture.py index 6a4a508e0..d092e8e28 100644 --- a/tests/sources/fixtures/github/fixture.py +++ b/tests/sources/fixtures/github/fixture.py @@ -236,7 +236,7 @@ def mock_graphql_response(self): if "viewer{login}" in query: mock_data = make_response({"data": {"viewer": {"login": "demo_repo"}}}) mock_data.status_code = 200 - mock_data.headers["X-OAuth-Scopes"] = ["repo"] + mock_data.headers["X-OAuth-Scopes"] = "repo, user, read:org" elif "repositories" in query: start_index, end_index, subset_nodes = self.get_index_metadata( variables, repos_data diff --git a/tests/sources/test_github.py b/tests/sources/test_github.py index b3e3ee5f0..8d0cc5d3d 100644 --- a/tests/sources/test_github.py +++ b/tests/sources/test_github.py @@ -587,11 +587,26 @@ async def test_ping_with_unsuccessful_connection(): @pytest.mark.asyncio -@patch("connectors.utils.time_to_sleep_between_retries", Mock(return_value=0)) async def test_validate_config_with_invalid_token_then_raise(): + async with create_github_source() as source: + with patch.object( + source.github_client, + "post", + side_effect=UnauthorizedException(), + ): + with pytest.raises(UnauthorizedException): + await source.validate_config() + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "scopes", + ["", "repo", "manage_runner:org, delete:packages, admin:public_key"], +) +async def test_validate_config_with_insufficient_scope(scopes): async with create_github_source() as source: source.github_client.post = AsyncMock( - return_value=({"user": "username"}, {"X-OAuth-Scopes": ""}) + return_value=({"user": "username"}, {"X-OAuth-Scopes": scopes}) ) with pytest.raises( ConfigurableFieldValueError, @@ -600,6 +615,21 @@ async def test_validate_config_with_invalid_token_then_raise(): await source.validate_config() +@pytest.mark.asyncio +async def test_validate_config_with_extra_scopes_token(patch_logger): + async with create_github_source() as source: + source.github_client.post = AsyncMock( + return_value=( + {"user": "username"}, + {"X-OAuth-Scopes": "user, repo, admin:org"}, + ) + ) + await source.validate_config() + patch_logger.assert_present( + "The provided token has higher privileges than required. It is advisable to run the connector with least privielged token. Required scopes are 'repo', 'user', and 'read:org'." + ) + + @pytest.mark.asyncio @patch("connectors.utils.time_to_sleep_between_retries", Mock(return_value=0)) async def test_validate_config_with_inaccessible_repositories_then_raise(): From e377d015a6b41221944f1f29aec217f8b158c6fb Mon Sep 17 00:00:00 2001 From: Dmitriy Burlutskiy Date: Tue, 9 Jan 2024 11:01:52 +0100 Subject: [PATCH 5/6] [CLI] Add documentation (#2031) --- README.md | 1 + docs/CLI.md | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+) create mode 100644 docs/CLI.md diff --git a/README.md b/README.md index 5a637cbb3..94c24b8f0 100644 --- a/README.md +++ b/README.md @@ -57,5 +57,6 @@ The framework serves two distinct, but related use cases: - [Elastic-internal guide](docs/INTERNAL.md) - [Connector Protocol](docs/CONNECTOR_PROTOCOL.md) - [Configuration](docs/CONFIG.md) +- [Command line interface](docs/CLI.md) - [Contribution guide](docs/CONTRIBUTING.md) - [Upgrading](docs/UPGRADING.md) diff --git a/docs/CLI.md b/docs/CLI.md new file mode 100644 index 000000000..100453376 --- /dev/null +++ b/docs/CLI.md @@ -0,0 +1,133 @@ +Connectors CLI is a command-line interface to Elastic Connectors for use in your terminal or your scripts. + +**Warning:** +Connectors CLI in tech preview. + +- [Installation](#installation) +- [Configuration](#configuration) +- [Available commands](#available-commands) +- [Known bugs and limitations](#known-bugs-and-limitations) + +## Installation +1. Clone the repository `git clone https://github.com/elastic/connectors.git` +2. Run `make clean install` to install dependencies and create executable files. +3. Connectors CLI is available via `./bin/connectors` + +## Configuration +**Note:** Make sure your Elasticsearch instance is up and running. + +1. Run `./bin/connectors login` to authenticate the CLI with an Elasticsearch instance. +2. Provide credentials +3. The command will create or ask to rewrite an existing configuration file in `./cli/config.yml` + +When you run any command you can specify a configuration file using `-c` argument. +Example: + +```bash +./bin/connectors -c connector list +``` + +## Available commands +### Getting help +Connectors CLI provides `--help` argument that can be used with any command to get more information. + +For example: +```bash +./bin/connectors --help + + +Usage: connectors [OPTIONS] COMMAND [ARGS]... + +Options: + -v, --version Show the version and exit. + -c, --config FILENAME + --help Show this message and exit. + +Commands: + connector Connectors management + index Search indices management + job Sync jobs management + login Authenticate Connectors CLI with an Elasticsearch instance +``` + +### Commands list + + - [connectors connector create](#connectors-connector-create) + - [connectors connector list](#connectors-connector-list) + - [connectors job list](#connectors-job-list) + - [connectors job cancel](#connectors-job-cancel) + - [connectors job start](#connectors-job-start) + - [connectors job view](#connectors-job-view) + + +#### connectors connector create +Creates a new connector and links it to an Elasticsearch index. When executing the command you will be asked to provide a connector configuration based on the service type you selected. For instance, you will be asked for host, username, and password if you select `mysql`. + +To bypass interactive mode you can pass `--from-file` argument pointing to a key-value JSON file with connectors configuration. + +Examples: + +```bash +./bin/connectors connector create --index-name my-index --service-type sharepoint_online --index-language en --from-file sharepoint-config.json +``` +This will create a new Sharepoint Online connector with an Elasticsearch index `my-index` and configuration from `sharepoint-online-config.json`. + +**Note** +See the connectors' [source code](../connectors/sources) to get more information about their configuration fields. + +#### connectors connector list + +Lists all the existing connectors + +Examples: + +```bash +./bin/connectors connector list +``` + +It will display all existing connectors and the associated indices. + +#### connectors job list +Lists all jobs and their stats. + +Examples +```bash +./bin/connectors job list -- +``` + +It will display all sync jobs including information like job status, number of indexed documents and index data volume associated with `connector_id`. + +#### connectors job cancel +Marks the job as `cancelling` to let Connector services know that the job has to be canceled. + +Examples: + +```bash +./bin/connectors job cancel -- +``` + +#### connectors job start +Schedules a new sync job and lets Connector service pick it up. + +Examples: + +```bash +./bin/connectors job start -- -i -t -o +``` + +It will schedule a new sync job using job type and connector id. The outcome of the command contains a job id. + +#### connectors job view +Shows information about a sync job. + +Examples: + +```bash +./bin/connectors job view -- -o Date: Tue, 9 Jan 2024 11:27:19 +0100 Subject: [PATCH 6/6] [Google Drive] Fix permission fetching bug for domain-wide delegation sync (#2024) --- connectors/sources/google_drive.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/connectors/sources/google_drive.py b/connectors/sources/google_drive.py index c3c8c6414..ead18ac16 100644 --- a/connectors/sources/google_drive.py +++ b/connectors/sources/google_drive.py @@ -172,7 +172,13 @@ async def list_files(self, fetch_permissions=False): yield file async def list_files_from_my_drive(self, fetch_permissions=False): - """Get files from Google Drive. Files can have any type. + """Retrieves files from Google Drive, with an option to fetch permissions (DLS). + + This function optimizes the retrieval process based on the 'fetch_permissions' flag. + If 'fetch_permissions' is True, the function filters for files the user can edit + ("trashed=false and 'me' in writers") as permission fetching requires write access. + If 'fetch_permissions' is False, it simply filters out trashed files ("trashed=false"), + allowing a broader file retrieval. Args: include_permissions (bool): flag to select permissions in the request query @@ -181,17 +187,19 @@ async def list_files_from_my_drive(self, fetch_permissions=False): dict: Documents from Google Drive. """ - files_fields = ( - DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS - if fetch_permissions - else DRIVE_ITEMS_FIELDS - ) + if fetch_permissions: + files_fields = DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS + # Google Drive API required write access to fetch file's permissions + list_query = "trashed=false and 'me' in writers" + else: + files_fields = DRIVE_ITEMS_FIELDS + list_query = "trashed=false" async for file in self.api_call_paged( resource="files", method="list", corpora="user", - q="trashed=false", + q=list_query, orderBy="modifiedTime desc", fields=f"files({files_fields}),incompleteSearch,nextPageToken", includeItemsFromAllDrives=False,