diff --git a/README.md b/README.md index 5a637cbb3..94c24b8f0 100644 --- a/README.md +++ b/README.md @@ -57,5 +57,6 @@ The framework serves two distinct, but related use cases: - [Elastic-internal guide](docs/INTERNAL.md) - [Connector Protocol](docs/CONNECTOR_PROTOCOL.md) - [Configuration](docs/CONFIG.md) +- [Command line interface](docs/CLI.md) - [Contribution guide](docs/CONTRIBUTING.md) - [Upgrading](docs/UPGRADING.md) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 901b7253d..5e90d33bd 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -336,7 +336,7 @@ def prompt(): # Index group -@click.group(invoke_without_command=True, help="Search indices management") +@click.group(invoke_without_command=False, help="Search indices management") @click.pass_obj def index(obj): pass diff --git a/connectors/es/index.py b/connectors/es/index.py index f379840b7..d0512ad1a 100644 --- a/connectors/es/index.py +++ b/connectors/es/index.py @@ -72,6 +72,7 @@ async def clean_index(self): index=self.index_name, body={"query": {"match_all": {}}}, ignore_unavailable=True, + conflicts="proceed", ) async def update(self, doc_id, doc, if_seq_no=None, if_primary_term=None): diff --git a/connectors/sources/azure_blob_storage.py b/connectors/sources/azure_blob_storage.py index 99d7f5133..0fc6c67e1 100644 --- a/connectors/sources/azure_blob_storage.py +++ b/connectors/sources/azure_blob_storage.py @@ -38,6 +38,7 @@ def __init__(self, configuration): self.connection_string = None self.retry_count = self.configuration["retry_count"] self.concurrent_downloads = self.configuration["concurrent_downloads"] + self.containers = self.configuration["containers"] def tweak_bulk_options(self, options): """Tweak bulk options as per concurrent downloads support by azure blob storage @@ -73,11 +74,17 @@ def get_default_configuration(cls): "order": 3, "type": "str", }, + "containers": { + "display": "textarea", + "label": "Azure Blob Storage containers", + "order": 4, + "type": "list", + }, "retry_count": { "default_value": DEFAULT_RETRY_COUNT, "display": "numeric", "label": "Retries per request", - "order": 4, + "order": 5, "required": False, "type": "int", "ui_restrictions": ["advanced"], @@ -86,7 +93,7 @@ def get_default_configuration(cls): "default_value": MAX_CONCURRENT_DOWNLOADS, "display": "numeric", "label": "Maximum concurrent downloads", - "order": 5, + "order": 6, "required": False, "type": "int", "ui_restrictions": ["advanced"], @@ -97,7 +104,7 @@ def get_default_configuration(cls): "use_text_extraction_service": { "display": "toggle", "label": "Use text extraction service", - "order": 6, + "order": 7, "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", "type": "bool", "ui_restrictions": ["advanced"], @@ -196,22 +203,39 @@ async def blob_download_func(self, blob_name, container_name): async for content in data.chunks(): yield content - async def get_container(self): + async def get_container(self, container_list): """Get containers from Azure Blob Storage via azure base client + Args: + container_list (list): List of containers Yields: dictionary: Container document with name & meta data """ + container_set = set(container_list) async with BlobServiceClient.from_connection_string( conn_str=self.connection_string, retry_total=self.retry_count ) as azure_base_client: - async for container in azure_base_client.list_containers( - include_metadata=True - ): - yield { - "name": container["name"], - "metadata": container["metadata"], - } + try: + async for container in azure_base_client.list_containers( + include_metadata=True + ): + if "*" in container_set or container["name"] in container_set: + yield { + "name": container["name"], + "metadata": container["metadata"], + } + if "*" not in container_set and container["name"] in container_set: + container_set.remove(container["name"]) + if not container_set: + return + if container_set and "*" not in container_set: + self._logger.warning( + f"Container(s) {','.join(container_set)} are configured but not found." + ) + except Exception as exception: + self._logger.warning( + f"Something went wrong while fetching containers. Error: {exception}" + ) async def get_blob(self, container): """Get blobs for a specific container from Azure Blob Storage via container client @@ -243,6 +267,7 @@ async def get_docs(self, filtering=None): Yields: dictionary: Documents from Azure Blob Storage """ - async for container in self.get_container(): - async for blob in self.get_blob(container=container): - yield blob, partial(self.get_content, blob) + async for container_data in self.get_container(container_list=self.containers): + if container_data: + async for blob in self.get_blob(container=container_data): + yield blob, partial(self.get_content, blob) diff --git a/connectors/sources/github.py b/connectors/sources/github.py index cb739bcd6..ee1c6c44f 100644 --- a/connectors/sources/github.py +++ b/connectors/sources/github.py @@ -1196,9 +1196,25 @@ async def _remote_validation(self): _, headers = await self.github_client.post( # pyright: ignore query_data=query_data, need_headers=True ) - if "repo" not in headers.get("X-OAuth-Scopes"): # pyright: ignore - msg = "Configured token does not have required rights to fetch the content" + + scopes = headers.get("X-OAuth-Scopes", "") + scopes = {scope.strip() for scope in scopes.split(",")} + required_scopes = {"repo", "user", "read:org"} + + for scope in ["write:org", "admin:org"]: + if scope in scopes: + scopes.add("read:org") + + if required_scopes.issubset(scopes): + extra_scopes = scopes - required_scopes + if extra_scopes: + self._logger.warning( + "The provided token has higher privileges than required. It is advisable to run the connector with least privielged token. Required scopes are 'repo', 'user', and 'read:org'." + ) + else: + msg = "Configured token does not have required rights to fetch the content. Required scopes are 'repo', 'user', and 'read:org'." raise ConfigurableFieldValueError(msg) + if self.github_client.repos != [WILDCARD]: invalid_repos = await self.get_invalid_repos() if invalid_repos: diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index 498c4b7c4..23087bd0a 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -172,11 +172,17 @@ def get_default_configuration(cls): """ return { + "buckets": { + "display": "textarea", + "label": "Google Cloud Storage buckets", + "order": 1, + "type": "list", + }, "service_account_credentials": { "display": "textarea", "label": "Google Cloud service account JSON", "sensitive": True, - "order": 1, + "order": 2, "type": "str", }, "use_text_extraction_service": { @@ -251,20 +257,26 @@ async def ping(self): ) raise - async def fetch_buckets(self): + async def fetch_buckets(self, buckets): """Fetch the buckets from the Google Cloud Storage. + Args: + buckets (List): List of buckets. Yields: Dictionary: Contains the list of fetched buckets from Google Cloud Storage. """ - async for bucket in self._google_storage_client.api_call( - resource="buckets", - method="list", - full_response=True, - project=self._google_storage_client.user_project_id, - userProject=self._google_storage_client.user_project_id, - ): - yield bucket + if "*" in buckets: + async for bucket in self._google_storage_client.api_call( + resource="buckets", + method="list", + full_response=True, + project=self._google_storage_client.user_project_id, + userProject=self._google_storage_client.user_project_id, + ): + yield bucket + else: + for bucket in buckets: + yield {"items": [{"id": bucket, "name": bucket}]} async def fetch_blobs(self, buckets): """Fetches blobs stored in the bucket from Google Cloud Storage. @@ -378,11 +390,9 @@ async def get_docs(self, filtering=None): Yields: dictionary: Documents from Google Cloud Storage. """ - async for buckets in self.fetch_buckets(): - if not buckets.get("items"): - continue + async for bucket in self.fetch_buckets(self.configuration["buckets"]): async for blobs in self.fetch_blobs( - buckets=buckets, + buckets=bucket, ): for blob_document in self.get_blob_document(blobs=blobs): yield blob_document, partial(self.get_content, blob_document) diff --git a/connectors/sources/google_drive.py b/connectors/sources/google_drive.py index c3c8c6414..ead18ac16 100644 --- a/connectors/sources/google_drive.py +++ b/connectors/sources/google_drive.py @@ -172,7 +172,13 @@ async def list_files(self, fetch_permissions=False): yield file async def list_files_from_my_drive(self, fetch_permissions=False): - """Get files from Google Drive. Files can have any type. + """Retrieves files from Google Drive, with an option to fetch permissions (DLS). + + This function optimizes the retrieval process based on the 'fetch_permissions' flag. + If 'fetch_permissions' is True, the function filters for files the user can edit + ("trashed=false and 'me' in writers") as permission fetching requires write access. + If 'fetch_permissions' is False, it simply filters out trashed files ("trashed=false"), + allowing a broader file retrieval. Args: include_permissions (bool): flag to select permissions in the request query @@ -181,17 +187,19 @@ async def list_files_from_my_drive(self, fetch_permissions=False): dict: Documents from Google Drive. """ - files_fields = ( - DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS - if fetch_permissions - else DRIVE_ITEMS_FIELDS - ) + if fetch_permissions: + files_fields = DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS + # Google Drive API required write access to fetch file's permissions + list_query = "trashed=false and 'me' in writers" + else: + files_fields = DRIVE_ITEMS_FIELDS + list_query = "trashed=false" async for file in self.api_call_paged( resource="files", method="list", corpora="user", - q="trashed=false", + q=list_query, orderBy="modifiedTime desc", fields=f"files({files_fields}),incompleteSearch,nextPageToken", includeItemsFromAllDrives=False, diff --git a/docs/CLI.md b/docs/CLI.md new file mode 100644 index 000000000..100453376 --- /dev/null +++ b/docs/CLI.md @@ -0,0 +1,133 @@ +Connectors CLI is a command-line interface to Elastic Connectors for use in your terminal or your scripts. + +**Warning:** +Connectors CLI in tech preview. + +- [Installation](#installation) +- [Configuration](#configuration) +- [Available commands](#available-commands) +- [Known bugs and limitations](#known-bugs-and-limitations) + +## Installation +1. Clone the repository `git clone https://github.com/elastic/connectors.git` +2. Run `make clean install` to install dependencies and create executable files. +3. Connectors CLI is available via `./bin/connectors` + +## Configuration +**Note:** Make sure your Elasticsearch instance is up and running. + +1. Run `./bin/connectors login` to authenticate the CLI with an Elasticsearch instance. +2. Provide credentials +3. The command will create or ask to rewrite an existing configuration file in `./cli/config.yml` + +When you run any command you can specify a configuration file using `-c` argument. +Example: + +```bash +./bin/connectors -c connector list +``` + +## Available commands +### Getting help +Connectors CLI provides `--help` argument that can be used with any command to get more information. + +For example: +```bash +./bin/connectors --help + + +Usage: connectors [OPTIONS] COMMAND [ARGS]... + +Options: + -v, --version Show the version and exit. + -c, --config FILENAME + --help Show this message and exit. + +Commands: + connector Connectors management + index Search indices management + job Sync jobs management + login Authenticate Connectors CLI with an Elasticsearch instance +``` + +### Commands list + + - [connectors connector create](#connectors-connector-create) + - [connectors connector list](#connectors-connector-list) + - [connectors job list](#connectors-job-list) + - [connectors job cancel](#connectors-job-cancel) + - [connectors job start](#connectors-job-start) + - [connectors job view](#connectors-job-view) + + +#### connectors connector create +Creates a new connector and links it to an Elasticsearch index. When executing the command you will be asked to provide a connector configuration based on the service type you selected. For instance, you will be asked for host, username, and password if you select `mysql`. + +To bypass interactive mode you can pass `--from-file` argument pointing to a key-value JSON file with connectors configuration. + +Examples: + +```bash +./bin/connectors connector create --index-name my-index --service-type sharepoint_online --index-language en --from-file sharepoint-config.json +``` +This will create a new Sharepoint Online connector with an Elasticsearch index `my-index` and configuration from `sharepoint-online-config.json`. + +**Note** +See the connectors' [source code](../connectors/sources) to get more information about their configuration fields. + +#### connectors connector list + +Lists all the existing connectors + +Examples: + +```bash +./bin/connectors connector list +``` + +It will display all existing connectors and the associated indices. + +#### connectors job list +Lists all jobs and their stats. + +Examples +```bash +./bin/connectors job list -- +``` + +It will display all sync jobs including information like job status, number of indexed documents and index data volume associated with `connector_id`. + +#### connectors job cancel +Marks the job as `cancelling` to let Connector services know that the job has to be canceled. + +Examples: + +```bash +./bin/connectors job cancel -- +``` + +#### connectors job start +Schedules a new sync job and lets Connector service pick it up. + +Examples: + +```bash +./bin/connectors job start -- -i -t -o +``` + +It will schedule a new sync job using job type and connector id. The outcome of the command contains a job id. + +#### connectors job view +Shows information about a sync job. + +Examples: + +```bash +./bin/connectors job view -- -o