Skip to content

Commit

Permalink
Merge branch 'main' into artem/make-mongodb-work-without-admin-access
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-shelkovnikov authored Jan 9, 2024
2 parents eb8c662 + 64d9692 commit 4bb11fd
Show file tree
Hide file tree
Showing 14 changed files with 435 additions and 72 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ The framework serves two distinct, but related use cases:
- [Elastic-internal guide](docs/INTERNAL.md)
- [Connector Protocol](docs/CONNECTOR_PROTOCOL.md)
- [Configuration](docs/CONFIG.md)
- [Command line interface](docs/CLI.md)
- [Contribution guide](docs/CONTRIBUTING.md)
- [Upgrading](docs/UPGRADING.md)
2 changes: 1 addition & 1 deletion connectors/connectors_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def prompt():


# Index group
@click.group(invoke_without_command=True, help="Search indices management")
@click.group(invoke_without_command=False, help="Search indices management")
@click.pass_obj
def index(obj):
pass
Expand Down
1 change: 1 addition & 0 deletions connectors/es/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ async def clean_index(self):
index=self.index_name,
body={"query": {"match_all": {}}},
ignore_unavailable=True,
conflicts="proceed",
)

async def update(self, doc_id, doc, if_seq_no=None, if_primary_term=None):
Expand Down
53 changes: 39 additions & 14 deletions connectors/sources/azure_blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, configuration):
self.connection_string = None
self.retry_count = self.configuration["retry_count"]
self.concurrent_downloads = self.configuration["concurrent_downloads"]
self.containers = self.configuration["containers"]

def tweak_bulk_options(self, options):
"""Tweak bulk options as per concurrent downloads support by azure blob storage
Expand Down Expand Up @@ -73,11 +74,17 @@ def get_default_configuration(cls):
"order": 3,
"type": "str",
},
"containers": {
"display": "textarea",
"label": "Azure Blob Storage containers",
"order": 4,
"type": "list",
},
"retry_count": {
"default_value": DEFAULT_RETRY_COUNT,
"display": "numeric",
"label": "Retries per request",
"order": 4,
"order": 5,
"required": False,
"type": "int",
"ui_restrictions": ["advanced"],
Expand All @@ -86,7 +93,7 @@ def get_default_configuration(cls):
"default_value": MAX_CONCURRENT_DOWNLOADS,
"display": "numeric",
"label": "Maximum concurrent downloads",
"order": 5,
"order": 6,
"required": False,
"type": "int",
"ui_restrictions": ["advanced"],
Expand All @@ -97,7 +104,7 @@ def get_default_configuration(cls):
"use_text_extraction_service": {
"display": "toggle",
"label": "Use text extraction service",
"order": 6,
"order": 7,
"tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.",
"type": "bool",
"ui_restrictions": ["advanced"],
Expand Down Expand Up @@ -196,22 +203,39 @@ async def blob_download_func(self, blob_name, container_name):
async for content in data.chunks():
yield content

async def get_container(self):
async def get_container(self, container_list):
"""Get containers from Azure Blob Storage via azure base client
Args:
container_list (list): List of containers
Yields:
dictionary: Container document with name & meta data
"""
container_set = set(container_list)
async with BlobServiceClient.from_connection_string(
conn_str=self.connection_string, retry_total=self.retry_count
) as azure_base_client:
async for container in azure_base_client.list_containers(
include_metadata=True
):
yield {
"name": container["name"],
"metadata": container["metadata"],
}
try:
async for container in azure_base_client.list_containers(
include_metadata=True
):
if "*" in container_set or container["name"] in container_set:
yield {
"name": container["name"],
"metadata": container["metadata"],
}
if "*" not in container_set and container["name"] in container_set:
container_set.remove(container["name"])
if not container_set:
return
if container_set and "*" not in container_set:
self._logger.warning(
f"Container(s) {','.join(container_set)} are configured but not found."
)
except Exception as exception:
self._logger.warning(
f"Something went wrong while fetching containers. Error: {exception}"
)

async def get_blob(self, container):
"""Get blobs for a specific container from Azure Blob Storage via container client
Expand Down Expand Up @@ -243,6 +267,7 @@ async def get_docs(self, filtering=None):
Yields:
dictionary: Documents from Azure Blob Storage
"""
async for container in self.get_container():
async for blob in self.get_blob(container=container):
yield blob, partial(self.get_content, blob)
async for container_data in self.get_container(container_list=self.containers):
if container_data:
async for blob in self.get_blob(container=container_data):
yield blob, partial(self.get_content, blob)
20 changes: 18 additions & 2 deletions connectors/sources/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,9 +1196,25 @@ async def _remote_validation(self):
_, headers = await self.github_client.post( # pyright: ignore
query_data=query_data, need_headers=True
)
if "repo" not in headers.get("X-OAuth-Scopes"): # pyright: ignore
msg = "Configured token does not have required rights to fetch the content"

scopes = headers.get("X-OAuth-Scopes", "")
scopes = {scope.strip() for scope in scopes.split(",")}
required_scopes = {"repo", "user", "read:org"}

for scope in ["write:org", "admin:org"]:
if scope in scopes:
scopes.add("read:org")

if required_scopes.issubset(scopes):
extra_scopes = scopes - required_scopes
if extra_scopes:
self._logger.warning(
"The provided token has higher privileges than required. It is advisable to run the connector with least privielged token. Required scopes are 'repo', 'user', and 'read:org'."
)
else:
msg = "Configured token does not have required rights to fetch the content. Required scopes are 'repo', 'user', and 'read:org'."
raise ConfigurableFieldValueError(msg)

if self.github_client.repos != [WILDCARD]:
invalid_repos = await self.get_invalid_repos()
if invalid_repos:
Expand Down
38 changes: 24 additions & 14 deletions connectors/sources/google_cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,17 @@ def get_default_configuration(cls):
"""

return {
"buckets": {
"display": "textarea",
"label": "Google Cloud Storage buckets",
"order": 1,
"type": "list",
},
"service_account_credentials": {
"display": "textarea",
"label": "Google Cloud service account JSON",
"sensitive": True,
"order": 1,
"order": 2,
"type": "str",
},
"use_text_extraction_service": {
Expand Down Expand Up @@ -251,20 +257,26 @@ async def ping(self):
)
raise

async def fetch_buckets(self):
async def fetch_buckets(self, buckets):
"""Fetch the buckets from the Google Cloud Storage.
Args:
buckets (List): List of buckets.
Yields:
Dictionary: Contains the list of fetched buckets from Google Cloud Storage.
"""
async for bucket in self._google_storage_client.api_call(
resource="buckets",
method="list",
full_response=True,
project=self._google_storage_client.user_project_id,
userProject=self._google_storage_client.user_project_id,
):
yield bucket
if "*" in buckets:
async for bucket in self._google_storage_client.api_call(
resource="buckets",
method="list",
full_response=True,
project=self._google_storage_client.user_project_id,
userProject=self._google_storage_client.user_project_id,
):
yield bucket
else:
for bucket in buckets:
yield {"items": [{"id": bucket, "name": bucket}]}

async def fetch_blobs(self, buckets):
"""Fetches blobs stored in the bucket from Google Cloud Storage.
Expand Down Expand Up @@ -378,11 +390,9 @@ async def get_docs(self, filtering=None):
Yields:
dictionary: Documents from Google Cloud Storage.
"""
async for buckets in self.fetch_buckets():
if not buckets.get("items"):
continue
async for bucket in self.fetch_buckets(self.configuration["buckets"]):
async for blobs in self.fetch_blobs(
buckets=buckets,
buckets=bucket,
):
for blob_document in self.get_blob_document(blobs=blobs):
yield blob_document, partial(self.get_content, blob_document)
22 changes: 15 additions & 7 deletions connectors/sources/google_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,13 @@ async def list_files(self, fetch_permissions=False):
yield file

async def list_files_from_my_drive(self, fetch_permissions=False):
"""Get files from Google Drive. Files can have any type.
"""Retrieves files from Google Drive, with an option to fetch permissions (DLS).
This function optimizes the retrieval process based on the 'fetch_permissions' flag.
If 'fetch_permissions' is True, the function filters for files the user can edit
("trashed=false and 'me' in writers") as permission fetching requires write access.
If 'fetch_permissions' is False, it simply filters out trashed files ("trashed=false"),
allowing a broader file retrieval.
Args:
include_permissions (bool): flag to select permissions in the request query
Expand All @@ -181,17 +187,19 @@ async def list_files_from_my_drive(self, fetch_permissions=False):
dict: Documents from Google Drive.
"""

files_fields = (
DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS
if fetch_permissions
else DRIVE_ITEMS_FIELDS
)
if fetch_permissions:
files_fields = DRIVE_ITEMS_FIELDS_WITH_PERMISSIONS
# Google Drive API required write access to fetch file's permissions
list_query = "trashed=false and 'me' in writers"
else:
files_fields = DRIVE_ITEMS_FIELDS
list_query = "trashed=false"

async for file in self.api_call_paged(
resource="files",
method="list",
corpora="user",
q="trashed=false",
q=list_query,
orderBy="modifiedTime desc",
fields=f"files({files_fields}),incompleteSearch,nextPageToken",
includeItemsFromAllDrives=False,
Expand Down
133 changes: 133 additions & 0 deletions docs/CLI.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
Connectors CLI is a command-line interface to Elastic Connectors for use in your terminal or your scripts.

**Warning:**
Connectors CLI in tech preview.

- [Installation](#installation)
- [Configuration](#configuration)
- [Available commands](#available-commands)
- [Known bugs and limitations](#known-bugs-and-limitations)

## Installation
1. Clone the repository `git clone https://github.com/elastic/connectors.git`
2. Run `make clean install` to install dependencies and create executable files.
3. Connectors CLI is available via `./bin/connectors`

## Configuration
**Note:** Make sure your Elasticsearch instance is up and running.

1. Run `./bin/connectors login` to authenticate the CLI with an Elasticsearch instance.
2. Provide credentials
3. The command will create or ask to rewrite an existing configuration file in `./cli/config.yml`

When you run any command you can specify a configuration file using `-c` argument.
Example:

```bash
./bin/connectors -c <config-file-path.yml> connector list
```

## Available commands
### Getting help
Connectors CLI provides `--help` argument that can be used with any command to get more information.

For example:
```bash
./bin/connectors --help


Usage: connectors [OPTIONS] COMMAND [ARGS]...

Options:
-v, --version Show the version and exit.
-c, --config FILENAME
--help Show this message and exit.

Commands:
connector Connectors management
index Search indices management
job Sync jobs management
login Authenticate Connectors CLI with an Elasticsearch instance
```

### Commands list

- [connectors connector create](#connectors-connector-create)
- [connectors connector list](#connectors-connector-list)
- [connectors job list](#connectors-job-list)
- [connectors job cancel](#connectors-job-cancel)
- [connectors job start](#connectors-job-start)
- [connectors job view](#connectors-job-view)


#### connectors connector create
Creates a new connector and links it to an Elasticsearch index. When executing the command you will be asked to provide a connector configuration based on the service type you selected. For instance, you will be asked for host, username, and password if you select `mysql`.

To bypass interactive mode you can pass `--from-file` argument pointing to a key-value JSON file with connectors configuration.

Examples:

```bash
./bin/connectors connector create --index-name my-index --service-type sharepoint_online --index-language en --from-file sharepoint-config.json
```
This will create a new Sharepoint Online connector with an Elasticsearch index `my-index` and configuration from `sharepoint-online-config.json`.

**Note**
See the connectors' [source code](../connectors/sources) to get more information about their configuration fields.

#### connectors connector list

Lists all the existing connectors

Examples:

```bash
./bin/connectors connector list
```

It will display all existing connectors and the associated indices.

#### connectors job list
Lists all jobs and their stats.

Examples
```bash
./bin/connectors job list -- <connector_id>
```

It will display all sync jobs including information like job status, number of indexed documents and index data volume associated with `connector_id`.

#### connectors job cancel
Marks the job as `cancelling` to let Connector services know that the job has to be canceled.

Examples:

```bash
./bin/connectors job cancel -- <job_id>
```

#### connectors job start
Schedules a new sync job and lets Connector service pick it up.

Examples:

```bash
./bin/connectors job start -- -i <connector_id> -t <job_type{full,incremental,access_control}> -o <format{text,json}>
```

It will schedule a new sync job using job type and connector id. The outcome of the command contains a job id.

#### connectors job view
Shows information about a sync job.

Examples:

```bash
./bin/connectors job view -- <job_id> -o <format{text,json}
```

It will display information about the job including job id, connector id, indexed document counts and index data value.

## Known bugs and limitations
1. Does not support Elasticsearch API keys for authentication.
2. Does not support all Kibana UI functionality.
Loading

0 comments on commit 4bb11fd

Please sign in to comment.