diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml index 21c00155..f02d57f6 100644 --- a/.github/workflows/docs.yaml +++ b/.github/workflows/docs.yaml @@ -31,6 +31,8 @@ jobs: poetry install -vv --no-interaction poetry show -vv - name: Build docs + env: + USE_AGG_MDS: true # so the aggregate MDS docs are added run: poetry run python run.py openapi - uses: stefanzweifel/git-auto-commit-action@v4.1.2 diff --git a/.secrets.baseline b/.secrets.baseline index b7107886..f21d3f0f 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -3,7 +3,7 @@ "files": null, "lines": null }, - "generated_at": "2022-10-03T18:10:28Z", + "generated_at": "2022-10-12T21:15:02Z", "plugins_used": [ { "name": "AWSKeyDetector" @@ -70,7 +70,7 @@ { "hashed_secret": "6eae3a5b062c6d0d79f070c26e6d62486b40cb46", "is_verified": false, - "line_number": 36, + "line_number": 62, "type": "Secret Keyword" } ], @@ -78,7 +78,7 @@ { "hashed_secret": "bf7e894868fd96c11edf05ef7d23122cbfa22e7e", "is_verified": false, - "line_number": 60, + "line_number": 204, "type": "Hex High Entropy String" } ], @@ -106,14 +106,6 @@ "type": "Hex High Entropy String" } ], - "tests/test_agg_mds_adapters.py": [ - { - "hashed_secret": "143e9f2aca10dbd2711cb96047f4016f095e5709", - "is_verified": false, - "line_number": 3898, - "type": "Hex High Entropy String" - } - ], "tests/test_migrations.py": [ { "hashed_secret": "4dcba4ad1d671981e2d211ebe56da8a5b40f14ef", diff --git a/README.md b/README.md index 86735ac9..2d9e3c24 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,119 @@ The server is built with [FastAPI](https://fastapi.tiangolo.com/) and packaged w The documentation can be browsed in the [docs](docs) folder, and key documents are linked below. -* [Detailed API Documentation](http://petstore.swagger.io/?url=https://raw.githubusercontent.com/uc-cdis/metadata-service/master/docs/openapi.yaml) -* [Development and deployment](docs/dev.md) -* [Aggregate Metadata Service](docs/agg_mds.md) +The aggregated MDS APIs and scripts copy metadata from one or many metadata services into a single data store. This enables a metadata service to act as a central API for browsing Metadata using clients such as the Ecosystem browser. + +The aggregate metadata APIs and migrations are disabled by default unless `USE_AGG_MDS=true` is specified. The `AGG_MDS_NAMESPACE` should also be defined for shared Elasticserach environments so that a unique index is used per-instance. + +The aggregate cache is built using Elasticsearch. See the `docker-compose.yaml` file (specifically the `aggregate_migration` service) for details regarding how aggregate data is populated. + +## Installation + +Install required software: + +* [PostgreSQL](PostgreSQL) 9.6 or above +* [Python](https://www.python.org/downloads/) 3.9 or above +* [Poetry](https://poetry.eustace.io/docs/#installation) + +Then use `poetry install` to install the dependencies. Before that, +a [virtualenv](https://virtualenv.pypa.io/) is recommended. +If you don't manage your own, Poetry will create one for you +during `poetry install`, and you must activate it by: + +```bash +poetry shell +``` + +## Development + +Create a file `.env` in the root directory of the checkout: +(uncomment to override the default) + +```python +# DB_HOST = "..." # default: localhost +# DB_PORT = ... # default: 5432 +# DB_USER = "..." # default: current user +# DB_PASSWORD = "..." # default: empty +# DB_DATABASE = "..." # default: current user +# USE_AGG_MDS = "..." # default: false +# AGG_MDS_NAMESPACE = "..." # default: default_namespace +# GEN3_ES_ENDPOINT = "..." # default: empty +``` + +Run database schema migration: + +```bash +alembic upgrade head +``` + +Run the server with auto-reloading: + +```bash +python run.py +``` + +Try out the API at: . + +## Run tests + +Please note that the name of the test database is prepended with "test_", you +need to create that database first: + +```bash +psql +CREATE DATABASE test_metadata; +``` + +```bash +pytest --cov=src --cov=migrations/versions tests +``` + +## Develop with Docker + +Use Docker compose: + +```bash +docker-compose up +``` + +Run database schema migration as well: + +```bash +docker-compose exec app alembic upgrade head +``` + +Run tests: + +```bash +docker-compose exec app pytest --cov=src --cov=migrations/versions tests +``` + +### Aggregate MDS +testing populate: +```bash +python src/mds/populate.py --config --hostname localhost --port 9200 +``` +view the loaded data +```bash +http://localhost:8000/aggregate/metadata?limit=1000 +``` + +## Deployment + +For production, use [gunicorn](https://gunicorn.org/): + +```bash +gunicorn mds.asgi:app -k uvicorn.workers.UvicornWorker -c gunicorn.conf.py +``` + +Or use the Docker image built from the `Dockerfile`, using environment variables +with the same name to configure the server. + +Other than database configuration, please also set: + +```bash +DEBUG=0 +ADMIN_LOGINS=alice:123,bob:456 +``` + +Except that, don't use `123` or `456` as the password. diff --git a/configs/brh_config.json b/configs/brh_config.json deleted file mode 100644 index eb25e9d1..00000000 --- a/configs/brh_config.json +++ /dev/null @@ -1,130 +0,0 @@ -{ - "gen3_commons": { - "IBD Commons": { - "mds_url": "https://ibdgc.datacommons.io", - "commons_url" : "ibdgc.datacommons.io", - "study_data_field" : "my_metadata", - "guid_type" : "my_metadata", - "columns_to_fields": { - "_subjects_count" : "subjects_count", - "study_description" : "brief_summary", - "short_name": "dataset_title", - "full_name": "dataset_title" - } - }, - "BioData Catalyst": { - "mds_url": "https://gen3.biodatacatalyst.nhlbi.nih.gov", - "commons_url" : "gen3.biodatacatalyst.nhlbi.nih.gov", - "columns_to_fields": { - "short_name": "name", - "_unique_id" : "study_id" - } - }, - "MIDRC": { - "mds_url": "https://data.midrc.org", - "commons_url" : "data.midrc.org", - "study_data_field" : "discovery_metadata", - "columns_to_fields": { - "_subjects_count" : "cases_count", - "study_description" : "research_description", - "_unique_id": "study_id" - } - }, - "NIAID ClinicalData": { - "mds_url": "https://accessclinicaldata.niaid.nih.gov", - "commons_url" : "accessclinicaldata.niaid.nih.gov", - "study_data_field" : "my_metadata", - "guid_type" : "my_metadata", - "columns_to_fields": { - "full_name": "title", - "study_id" : "nct_number", - "_unique_id": "nct_number", - "study_description" : "brief_summary" - } - }, - "JCOIN": { - "mds_url": "https://jcoin.datacommons.io/", - "commons_url" : "jcoin.datacommons.io/", - "columns_to_fields": { - "_subjects_count" : "subjects", - "study_description" : "summary", - "short_name": "study_name", - "full_name": "study_name" - } - }, - "AnVIL": { - "mds_url": "https://internalstaging.theanvil.io", - "commons_url": "gen3.theanvil.io", - "columns_to_fields": { - "name": "name", - "full_name": "full_name", - "_subjects_count" : "_subjects_count", - "_unique_id" : "study_id", - "study_description" : "study_description" - } - }, - "Genomic Data Commons": { - "mds_url": "https://gen3.datacommons.io", - "commons_url": "portal.gdc.cancer.gov", - "study_data_field" : "discovery_metadata", - "columns_to_fields": { - "_subjects_count" : "subjects_count", - "dbgap_accession_number" : "study_id", - "study_description" : "description" - }, - "select_field": { - "field_name" : "commons" , - "field_value" : "Genomic Data Commons" - } - }, - "Proteomic Data Commons": { - "mds_url": "https://gen3.datacommons.io", - "commons_url": "proteomic.datacommons.cancer.gov/pdc", - "columns_to_fields": { - "_subjects_count" : "cases_count", - "study_id" : "_unique_id", - "description" : "study_description" - }, - "select_field": { - "field_name" : "commons" , - "field_value" : "Proteomic Data Commons" - } - }, - "Cancer Imaging Data Commons": { - "mds_url": "https://gen3.datacommons.io", - "commons_url": "imaging.datacommons.cancer.gov/", - "columns_to_fields": { - "_subjects_count" : "cases_count", - "study_id" : "_unique_id", - "description" : "study_description" - }, - "select_field": { - "field_name" : "commons" , - "field_value" : "Cancer Imaging Data Commons" - } - } - }, - "adapter_commons": { - "Kids First": { - "mds_url": "https://gen3staging.kidsfirstdrc.org/", - "commons_url": "kidsfirstdrc.org", - "adapter": "gen3", - "config" : { - "guid_type": "metadata_object", - "study_field": "dbgap" - }, - "keep_original_fields": false, - "field_mappings" : { - "authz": "path:authz", - "tags": "path:gen3_discovery.tags", - "_unique_id": "path:_unique_id", - "study_id": "path:_unique_id", - "study_description": "path:description", - "full_name": "path:full_name", - "short_name": "path:full_name", - "commons": "Kids First Data Resource Center", - "study_url": "path:link" - } - } - } -} diff --git a/docker-compose.yml b/docker-compose.yml index 1256f94b..c721ed22 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,7 +36,7 @@ services: environment: - USE_AGG_MDS=true - GEN3_ES_ENDPOINT=http://esproxy-service:9200 - command: sh -c 'while [[ "$$(curl --connect-timeout 2 -s -o /dev/null -w ''%{http_code}'' $$GEN3_ES_ENDPOINT)" != "200" ]]; do echo "wait for " $$GEN3_ES_ENDPOINT; sleep 5; done; echo es backend is available;/env/bin/python /src/src/mds/populate.py --config /src/tests/config.json' + command: bash -c 'while [[ "$$(curl --connect-timeout 2 -s -o /dev/null -w ''%{http_code}'' $$GEN3_ES_ENDPOINT)" != "200" ]]; do echo "wait for " $$GEN3_ES_ENDPOINT; sleep 5; done; echo es backend is available;/env/bin/python /src/src/mds/populate.py --config /src/tests/config.json' db: image: postgres environment: diff --git a/docs/metadata_adapters.md b/docs/aggregate_metadata.md similarity index 54% rename from docs/metadata_adapters.md rename to docs/aggregate_metadata.md index f2b0496a..17bdb474 100644 --- a/docs/metadata_adapters.md +++ b/docs/aggregate_metadata.md @@ -1,4 +1,4 @@ -# Gen3 Metadata Adapters +# Configuring the Gen3 Aggregate Metadata Service and Adapters Ingesting data into an Aggregate Metadata Service from a remote Metadata Service is handled by an adapter. An adapter is used to interface with a @@ -8,7 +8,9 @@ either when it is started or on-demand. The adapter assists in the ETL process to pull, cleanup and normalize metadata before it is loaded. The adapters are configurable by a JSON object which is described below. -![](assets/metadata-adapters-fig.png)*Adapters enable pulling metadata from a remote service* +![](assets/metadata-adapters-fig.png) + +*Adapters enable pulling metadata from a remote service* ## Introduction @@ -29,6 +31,147 @@ The adapter works in the following order of operations: ## Configuration A metadata service is configurable via a JSON object, with the following format: +```json lines +"configuration": { + "schema": { + ... + }, + "settings": { + ... + } + }, + "adapter_commons": { + Adapters Configuration + } +``` +### Schema + +The schema section is optional. It allows user to have a finer level of control over the Elastic Search backend and if defined +will allow for schema introspection via a JSON schema. +A schema is of the form: + +```json lines + "schema": { + "__manifest": { + "description": "an array of filename (usually DRS ids and its size", + "type": "array", + "properties": { + "file_name": { + "type": "string" + }, + "file_size": { + "type": "integer" + } + } + }, + "commons_url": {}, + "short_name": { + "default" : "not_set" + }, + "tags": { + "type": "array" + }, +``` + +Where each defined field can be defined with the data type, description, and a default value. All are optional, the default type is ```string```. Note any field defined in an adapter field mapping section below NOT defined in the +schema will be added and auto typed by Elastic search. The purpose of the schema is to provide a way to explicitly type fields, especially nested object (for example ```__manifest``` above). +It also allows for a default value to be defined an aggregate metadata field will be set to if the value is not present in a metadata object. +This also allows for introspection by returning a JSON schema form using the ```info``` API call: + +``` +http://localhost:8000/aggregate/info/schema +``` + +```json +{ + "_subjects_count": { + "type": "integer", + "description": "" + }, + "__manifest": { + "type": "array", + "properties": { + "file_name": { + "type": "string", + "description": "" + }, + "file_size": { + "type": "integer", + "description": "" + } + }, + "description": "and array of filename (usually DRS ids and its size" + }, + "tags": { + "type": "array", + "description": "" + }, + "_unique_id": { + "type": "string", + "description": "" + }, + "study_description": { + "type": "string", + "description": "" + }, + "study_id": { + "type": "string", + "description": "" + }, + "study_url": { + "type": "string", + "description": "" + }, + "project_id": { + "type": "string", + "description": "" + }, + "short_name": { + "type": "string", + "description": "", + "default": "not_set" + }, + "year": { + "type": "string", + "description": "", + "default": "not_set" + }, + "full_name": { + "type": "string", + "description": "" + }, + "commons_url": { + "type": "string", + "description": "" + }, + "commons": { + "type": "string", + "description": "" + } +} +``` + +### Settings + +#### DRS Caching +* **cache_drs** : [true|false] - if set to true, the adapter will +connected to dataguids.org and cache the DRS directory information. This information is available via the +into API endpoint: +``` +http://localhost:8000/aggregate/info/dg.H35L +``` +``` +{ + "host": "externaldata.healdata.org", + "name": "External Data HEAL", + "type": "indexd" +} +``` + +## Adapter Configuration + +The ```adapter_commons``` section of the configuration file is used to define where the aggregate metadata service will pull data from. +There can be any of adapters, in fact a single Gen3 commons can be queried more than once by defining different adapter setting. ```json { @@ -49,7 +192,8 @@ A metadata service is configurable via a JSON object, with the following format: "location": "path:coverage[0]", "summary": { "path":"description", - "filters": ["strip_html"] + "filters": ["strip_html"], + "default_value" : "N/A" }, ... }, @@ -75,46 +219,82 @@ A metadata service is configurable via a JSON object, with the following format: ``` *A sample configuration file* +For a fully working configuration file to pull sample data from [gen3.datacommons.io](gen3.datacommon.io) is [here](sample_aggregate_mds_config.json). + Any number of adapters can be added to a configuration file as long as the -key per adapter is unique. +commons name (used as a key) per adapter is unique. ### Parameters The parameters of an adapter are: * ```mds_url```: URL of the metadata serviceAPI. * ```commons_url```: the URL for the homepage the metadata source + * ```commons_name``` : override the commons_name. Typically, the commons is named using the entry name for the adapter. (ICPSR in the above config file). However there are case where +using a different name is preferred. For example if one of more adapters are assigned the same name +all the entries will be added to the commons name in the aggregateMDS. This can use to have multiple adapters +pull data from the same source, but using different mappings of filtering operations. * ```adapter```: registered name of the adapter, used to bind a particular adapter to a site: NOTE there is no checking to ensure that the correct adapters are being used. Usually, in the case of a mismatch, errors are logged and nothing is pulled. -* ```config```: an object defining any additional parameter needed for the adapter. -* ```filters```: the parameters (or filter + * ```config```: an object defining any additional parameters needed for an adapter (see Gen3 Adapter below). + * ```filters```: the parameters (or filter properties) passed to the adapter, this is adapter specific. In the -above example, the ```study_id``` parameter is selecting which study ids to -pull from ICPSR. +above example, the ```study_id``` parameter for the ICPSR adapter is used to select which study ids to +pull from ICPSR. Note that adapter themselves can have filtering options, this is +provided as a backup if no other filter option is available. + +#### Adapter Setting + +* **keep_original_fields** ```[true|false]``` - allows the adapter to add all of the original +field in a study when loading. If set to true, any field already defined and process will be updated to +the processed value. + +Sometimes a need arises to filter entries based on a field value. ```select_fields``` +config provides a way to filter out data that does NOT match. The setting are: + +* **field_name** - the field name to filter. Note that the filter is executed +after the data has been processed so the values needs to be mapped or normalized name +* **field_value** - set to a string. Any fields NOT matching this value will ot be added. + +A sample: +``` + ... + "select_field": { + "field_name": "data_resource", + "field_value": "SAMHDA" + }, + ... +``` ### Field Mappings The next section of the configuration, is the field mappings which map a field name from the remote metadata into a standard name. This process is also called normalization. The mapping is simply the name of the normalized field (what is stored in the Aggregate metadata service ) to the remote field. Think of it as ```AggMDS field = Remote Field```. While this works for simple cases, there are many instances where the field is deeper in a JSON object. To resolve this you can specify a **path selector** -### Selectors +#### Selectors A path from the start (or root) of a remote metadata field can be described using [JSON path syntax](https://tools.ietf.org/id/draft-goessner-dispatch-jsonpath-00.html). JSON path can be used by prefixing ```path:``` to a JSON path expression to the field you want to get the value for. For example, if you wanted to get the first official name in the array OverallOfficial the selection would be ```path:OverallOfficial[0].OverallOfficialName``` You can experiment with JSON paths using a [JSON Path editor](https://jsonpath.com/). -### Filters +#### Filters The above methods should allow you to pull any nested value from a metadata entry. There are also cases where the metadata needs further processing or filtering. While this can be done in Python, by writing a new or extending an existing adapter, there is also the option to apply a filter. A filter can be added to a field using the long mapping form: ```json "summary": { "path":"description", - "filters": ["strip_html"] + "filters": ["strip_html"], + "default" : "N/A" } ``` -In this case, the ```summary``` is set to a JSON object which optionally defines a JSON path and an array of one or more filters to apply. The filters are applied to the text value of the remote field. Furthermore, the filters are applied in the order they appear. The current set of filters are: +In this case, the ```summary``` is set to a JSON object which optionally defines: +* a JSON path +* an array of one or more filters to apply +* default value to set if that field is not found + +The filters are applied to the text value of the remote field. Furthermore, the filters are applied in the order they appear. The current set of filters are: -* strip_html: remove HTML tags from a text field -* strip_email: remove email addresses from a text field -* add_icpsr_source_url: creates a url to the study data on ICPSR -* add_clinical_trials_source_url: creates a url to the study on clinicaltrials.gov +* **strip_html**: remove HTML tags from a text field +* **strip_email**: remove email addresses from a text field +* **add_icpsr_source_url**: creates an url to the study data on ICPSR +* **add_clinical_trials_source_url**: creates an url to the study on clinicaltrials.gov You can add your own filters, and register them by creating a python function with the signature: ```python @@ -122,8 +302,36 @@ def filter_function(s:str) -> str: ... ``` -### Default Values -Defining default values for fields is quite simple: define the normalized field name and a value. If a remote metadata field has a value, it will override the default. +#### Default Values +Defining default values for fields is handled in one of two ways: +If a field in the metadata does not need a path, simply define the +field name and a value. If a remote metadata field has a value, it will override the default. +If a path is used, then use the longer form and set the ```default``` to use +if the path is not found. The longer form of a field mapping is: +```json lines + "summary": { + "path":"description", + "filters": ["strip_html"], + "default" : "N/A" + }, +``` +where: +* ```path``` is the json path to the field +* ```filters```: list of filters to apply (optional) +* ```default```: value to set the field to if the path does not resolve (also optional) +```json +{ + ... + "summary": { + "path": "description", + "filters": [ + "strip_html" + ], + "default": "N/A" + }, + ... +} +``` ### Per Item Overrides @@ -259,7 +467,53 @@ The code above does the following: 4. Return the results -While the Adapters use Object Oriented Programming, you actually do not need to extend from the classes as long as you create a class with the above signature you should be fine. +While the Adapters support Object-Oriented Programming, you actually do not need to extend from the classes as long as you create a class with the above signature you should be fine. ### Adding your Adapter Adding your adapter and or filters to be called by the populate process is still in the design phase. Currently, this requires adding the adapter code into the source code of the Gen3 metadata-service. However, shortly we will move to a plugin-based model. + +## Gen3 Adapter +The Gen3 Adapter is used to interface and retrieve data from a Gen3 Datacommons running a metadata-service. +The configuration for the Gen3 Commons is identical to what is described above. The **config** section provides a +way define what _guid_type and field to read an entry from. + +### Configuring the metadata schema +Note that the Gen3 metadata is typically in this format: + +```json lines + "ds000030": { + "_guid_type": "discovery_metadata", + "gen3_discovery": { ... +``` +The ```_guid_type``` and ```gen3_discovery``` usually default to +```"discovery_metadata"``` and ```"gen3_discovery"```. However, this is not always the case. +To account for these differences you can add the following to a Gen3 adapter config section +where ```guid_type``` sets the string for ```_guid_type``` +```study_field``` set the name of the metadata filed within +the ```guid_type``` object. + +```json lines + "config" : { + "guid_type": "unregistered_discovery_metadata", + "study_field": "my_metadata" + }, +``` +this will the look for metadata entries such as: + +```json lines + "ds000030": { + "unregistered_discovery_metadata": "discovery_metadata", + "my_metadata": { ... +``` +### Advanced filtering + +The Gen3 metadata-service supports filtering as described in the documentation. The Gen3 Adapter +allows a filter option to be configs which is passed to the MDS. Specific studies can +be pulled from the MDS by defining the filters. +The filters are part of the config setting: +```json lines + "config": { + "filters": "gen3_discovery.data_resource=SAMHDA" + }, +``` +Note that this can work along with the ```guid_type``` and ```study_field```. diff --git a/docs/openapi.yaml b/docs/openapi.yaml index 4a555a8d..2743ce66 100644 --- a/docs/openapi.yaml +++ b/docs/openapi.yaml @@ -92,7 +92,7 @@ components: type: http info: title: Framework Services Object Management Service - version: 1.10.0 + version: 1.11.0 openapi: 3.0.2 paths: /_status: @@ -108,6 +108,238 @@ paths: schema: {} description: Successful Response summary: Get Status + /aggregate/commons: + get: + description: "Returns a list of all commons with data in the aggregate metadata-service\n\ + \nExample:\n\n { commons: [\"commonsA\", \"commonsB\" ] }" + operationId: get_commons_aggregate_commons_get + responses: + '200': + content: + application/json: + schema: {} + description: Successful Response + summary: Get Commons + tags: + - Aggregate + /aggregate/info/{what}: + get: + description: "Returns status and configuration information about aggregate metadata\ + \ service.\n\nReturn configuration information. Currently supports only 1\ + \ information type:\n**schema**\n\nExample:\n\n {\n schema: {\n \ + \ ...\n ...\n }\n }" + operationId: get_commons_info_aggregate_info__what__get + parameters: + - in: path + name: what + required: true + schema: + title: What + type: string + responses: + '200': + content: + application/json: + schema: {} + description: Successful Response + '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + summary: Get Commons Info + tags: + - Aggregate + /aggregate/metadata: + get: + description: "Returns metadata records\n\nReturns medata records namespaced\ + \ by commons as a JSON object.\nExample without pagination:\n\n {\n \ + \ \"commonA\" : {\n ... Metadata\n },\n \"commonB\"\ + \ : {\n ... Metadata\n }\n ...\n }\n\nThe pagination\ + \ option adds a pagination object to the response:\n\n {\n results:\ + \ {\n \"commonA\" : {\n ... Metadata\n },\n\ + \ \"commonB\" : {\n ... Metadata\n }\n \ + \ ...\n },\n \"pagination\": {\n \"hits\":\ + \ 64,\n \"offset\": 0,\n \"pageSize\": 20,\n \ + \ \"pages\": 4\n }\n }\n\nThe flatten option removes the commons'\ + \ namespace so all results are a child or results:\n\n results: {\n \ + \ ... Metadata from commons A\n ... Metadata from commons B\n\ + \ }\n ...\n },\n\n\nThe counts options when applied to an array\ + \ or dictionary will replace\nthe field value with its length. If the field\ + \ values is None it will replace it with 0.\nAll other types will be unchanged." + operationId: get_aggregate_metadata_aggregate_metadata_get + parameters: + - description: 'Maximum number of records returned. (e.g. max: 2000)' + in: query + name: limit + required: false + schema: + default: 20 + description: 'Maximum number of records returned. (e.g. max: 2000)' + title: Limit + type: integer + - description: Return results at this given offset. + in: query + name: offset + required: false + schema: + default: 0 + description: Return results at this given offset. + title: Offset + type: integer + - description: 'Return count of a field instead of the value if field is an + array otherwise field is unchanged. If field is **null** will + set field to **0**. Multiple fields can be compressed by comma + separating the field names: **files,authors**' + in: query + name: counts + required: false + schema: + default: '' + description: 'Return count of a field instead of the value if field is an + array otherwise field is unchanged. If field is **null** will + set field to **0**. Multiple fields can be compressed by comma + separating the field names: **files,authors**' + title: Counts + type: string + - description: Return the results without grouping items by commons. + in: query + name: flatten + required: false + schema: + default: false + description: Return the results without grouping items by commons. + title: Flatten + type: boolean + - description: If true will return a pagination object in the response + in: query + name: pagination + required: false + schema: + default: false + description: If true will return a pagination object in the response + title: Pagination + type: boolean + responses: + '200': + content: + application/json: + schema: {} + description: Successful Response + '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + summary: Get Aggregate Metadata + tags: + - Aggregate + /aggregate/metadata/guid/{guid}: + get: + description: "Returns a metadata record by GUID\n\nExample:\n\n { id2: {\ + \ name: \"bear\" } }" + operationId: get_aggregate_metadata_guid_aggregate_metadata_guid__guid__get + parameters: + - in: path + name: guid + required: true + schema: + title: Guid + type: string + responses: + '200': + content: + application/json: + schema: {} + description: Successful Response + '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + summary: Get Aggregate Metadata Guid + tags: + - Aggregate + /aggregate/metadata/{name}: + get: + description: "et all metadata records from a commons by name\n\nReturns an array\ + \ containing all the metadata entries for a single commons.\nThere are no\ + \ limit/offset parameters.\n\nExample:\n\n [ { id2: { name: \"bear\" }\ + \ } , { id3: { name: \"cat\" } }]" + operationId: get_aggregate_metadata_for_commons_aggregate_metadata__name__get + parameters: + - in: path + name: name + required: true + schema: + title: Name + type: string + responses: + '200': + content: + application/json: + schema: {} + description: Successful Response + '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + summary: Get Aggregate Metadata For Commons + tags: + - Aggregate + /aggregate/metadata/{name}/info: + get: + description: "Returns information from the named commons.\n\nExample:\n\n \ + \ { commons_url: \"gen3.datacommons.io\" }" + operationId: get_aggregate_metadata_commons_info_aggregate_metadata__name__info_get + parameters: + - in: path + name: name + required: true + schema: + title: Name + type: string + responses: + '200': + content: + application/json: + schema: {} + description: Successful Response + '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + summary: Get Aggregate Metadata Commons Info + tags: + - Aggregate + /aggregate/tags: + get: + description: "Returns aggregate category, name and counts across all commons\n\ + \nExample:\n\n {\n \"Data Type\": {\n \"total\"\ + : 275,\n \"names\": [\n {\n \"Genotype\"\ + : 103,\n \"Clinical Phenotype\": 100,\n \"DCC\ + \ Harmonized\": 24,\n \"WGS\": 20,\n \"SNP/CNV\ + \ Genotypes (NGS)\": 6,\n \"RNA-Seq\": 5,\n \ + \ \"WXS\": 5,\n \"Targeted-Capture\": 3,\n \"\ + miRNA-Seq\": 3,\n \"CNV Genotypes\": 2\n }\n \ + \ ]\n }\n }" + operationId: get_aggregate_tags_aggregate_tags_get + responses: + '200': + content: + application/json: + schema: {} + description: Successful Response + summary: Get Aggregate Tags + tags: + - Aggregate /metadata: get: description: "Search the metadata.\n\nWithout filters, this will return all\ diff --git a/docs/sample_aggregate_mds_config.json b/docs/sample_aggregate_mds_config.json new file mode 100644 index 00000000..863d5210 --- /dev/null +++ b/docs/sample_aggregate_mds_config.json @@ -0,0 +1,67 @@ +{ + "configuration": { + "schema": { + "_subjects_count": { + "type": "integer" + }, + "__manifest": { + "description": "an array of filename (usually DRS ids and its size", + "type": "array", + "properties": { + "file_name": { + "type": "string" + }, + "file_size": { + "type": "integer" + } + } + }, + "tags": { + "type": "array" + }, + "_unique_id": {}, + "study_description": {}, + "study_id": {}, + "study_url": {}, + "project_id": {}, + "short_name": { + "default" : "not_set" + }, + "year": { + "default" : "not_set" + }, + "full_name": {}, + "commons_url": {}, + "commons" : {} + }, + "settings" : { + "cache_drs" : true + } + }, + "adapter_commons": { + "Gen3": { + "mds_url": "https://gen3.datacommons.io/", + "commons_url": "gen3.datacommons.io/", + "adapter": "gen3", + "config" : { + "guid_type": "discovery_metadata", + "study_field": "gen3_discovery" + }, + "keep_original_fields": false, + "field_mappings" : { + "tags": "path:tags", + "_unique_id": "path:_unique_id", + "study_description": "path:summary", + "full_name": "path:study_title", + "short_name": "path:short_name", + "year": "path:year", + "accession_number": "path:accession_number", + "commons": "Gen3 Data Commons", + "study_url": { + "path": "link", + "default": "unknown" + } + } + } + } +} diff --git a/pyproject.toml b/pyproject.toml index 126cc708..ab60c816 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "mds" -version = "1.10.0" +version = "1.11.0" description = "Metadata Service" authors = ["CTDS UChicago "] license = "Apache-2.0" diff --git a/src/mds/agg_mds/adapters.py b/src/mds/agg_mds/adapters.py index ed80d86b..26022c3a 100644 --- a/src/mds/agg_mds/adapters.py +++ b/src/mds/agg_mds/adapters.py @@ -19,6 +19,8 @@ def strip_email(text: str): + if not isinstance(text, str): + return text rgx = r"[\w.+-]+@[\w-]+\.[\w.-]+" matches = re.findall(rgx, text) for cur_word in matches: @@ -27,14 +29,20 @@ def strip_email(text: str): def strip_html(s: str): + if not isinstance(s, str): + return s return bleach.clean(s, tags=[], strip=True) def add_icpsr_source_url(study_id: str): + if not isinstance(study_id, str): + return study_id return f"https://www.icpsr.umich.edu/web/NAHDAP/studies/{study_id}" def add_clinical_trials_source_url(study_id: str): + if not isinstance(study_id, str): + return study_id return f"https://clinicaltrials.gov/ct2/show/{study_id}" @@ -54,14 +62,20 @@ def execute(cls, name, value): return FieldFilters.filters[name](value) -def get_json_path_value(expression: str, item: dict) -> Union[str, List[Any]]: +def get_json_path_value( + expression: str, + item: dict, + has_default_value: bool = False, + default_value: str = "", +) -> Union[str, List[Any]]: """ Given a JSON Path expression and a dictionary, using the path expression - to find the value. If not found return an empty string + to find the value. If not found return and default value define return it, else + return None """ if expression is None: - return "" + return default_value if has_default_value else None try: jsonpath_expr = parse(expression) @@ -69,11 +83,11 @@ def get_json_path_value(expression: str, item: dict) -> Union[str, List[Any]]: logger.error( f"Invalid JSON Path expression {exc} . See https://github.com/json-path/JsonPath. Returning ''" ) - return "" + return default_value if has_default_value else None v = jsonpath_expr.find(item) - if len(v) == 0: # nothing found use default value of empty string - return "" + if len(v) == 0: # nothing found, deal with this + return default_value if has_default_value else None if len(v) == 1: # convert array length 1 to a value return v[0].value @@ -116,7 +130,7 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict: """needs to be implemented in derived class""" @staticmethod - def mapFields(item: dict, mappings: dict, global_filters=None) -> dict: + def mapFields(item: dict, mappings: dict, global_filters=None, schema=None) -> dict: """ Given a MetaData entry as a dict, and dictionary describing fields to add and optionally where to map an item entry from. @@ -132,13 +146,17 @@ def mapFields(item: dict, mappings: dict, global_filters=None) -> dict: field: { path: JSON Path filters: [process field filters] + default_value(optional): Any Value } :param item: dictionary to map fields to - :param mappings: + :param mappings: dictionary describing fields to add :return: """ + if schema is None: + schema = {} + if global_filters is None: global_filters = [] @@ -147,7 +165,23 @@ def mapFields(item: dict, mappings: dict, global_filters=None) -> dict: for key, value in mappings.items(): if isinstance(value, dict): # have a complex assignment expression = value.get("path", None) - field_value = get_json_path_value(expression, item) + + hasDefaultValue = False + default_value = None + # get adapter's default value if set + if "default" in value: + hasDefaultValue = True + default_value = value["default"] + + # get schema default value if set + if hasDefaultValue is False: + if key in schema and schema[key].default is not None: + hasDefaultValue = True + default_value = schema[key].default + + field_value = get_json_path_value( + expression, item, hasDefaultValue, default_value + ) filters = value.get("filters", []) for filter in filters: @@ -156,12 +190,34 @@ def mapFields(item: dict, mappings: dict, global_filters=None) -> dict: elif isinstance(value, str) and "path:" in value: # process as json path expression = value.split("path:")[1] - field_value = get_json_path_value(expression, item) + + hasDefaultValue = False + default_value = None + if key in schema: + d = schema[key].default + if d is not None: + hasDefaultValue = True + default_value = d + + field_value = get_json_path_value( + expression, item, hasDefaultValue, default_value + ) else: field_value = value for f in global_filters: field_value = FieldFilters.execute(f, field_value) + if key in schema: + field_value = schema[key].normalize_value(field_value) + # set to default if conversion failed and a default value is available + if field_value is None: + if hasDefaultValue: + field_value = default_value + else: + logger.warn( + f"{key} = None{', is not in the schema,' if key not in schema else ''} " + f"and has no default value. Consider adding {key} to the schema" + ) results[key] = field_value return results @@ -221,7 +277,7 @@ def getRemoteDataAsJson(self, **kwargs) -> Tuple[Dict, str]: raise except httpx.HTTPError as exc: logger.error( - f"An HTTP error { exc.response.status_code if exc.response is not None else '' } occurred while requesting {exc.request.url}. Skipping {id}" + f"An HTTP error {exc.response.status_code if exc.response is not None else ''} occurred while requesting {exc.request.url}. Skipping {id}" ) break except Exception as exc: @@ -313,7 +369,7 @@ def getRemoteDataAsJson(self, **kwargs) -> Dict: raise except httpx.HTTPError as exc: logger.error( - f"An HTTP error { exc.response.status_code if exc.response is not None else '' } occurred while requesting {exc.request.url}. Skipping {id}" + f"An HTTP error {exc.response.status_code if exc.response is not None else ''} occurred while requesting {exc.request.url}. Skipping {id}" ) except ValueError as exc: logger.error( @@ -331,11 +387,13 @@ def buildIdentifier(id: str): return id.replace("http://doi.org/", "").replace("dc:", "") @staticmethod - def addGen3ExpectedFields(item, mappings, keepOriginalFields, globalFieldFilters): + def addGen3ExpectedFields( + item, mappings, keepOriginalFields, globalFieldFilters, schema + ): results = item if mappings is not None: mapped_fields = RemoteMetadataAdapter.mapFields( - item, mappings, globalFieldFilters + item, mappings, globalFieldFilters, schema ) if keepOriginalFields: results.update(mapped_fields) @@ -360,6 +418,7 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]: mappings = kwargs.get("mappings", None) keepOriginalFields = kwargs.get("keepOriginalFields", True) globalFieldFilters = kwargs.get("globalFieldFilters", []) + schema = kwargs.get("schema", {}) results = {} for record in data["results"]: @@ -375,7 +434,7 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]: else: item[str.replace(key, "dc:", "")] = value normalized_item = ISCPSRDublin.addGen3ExpectedFields( - item, mappings, keepOriginalFields, globalFieldFilters + item, mappings, keepOriginalFields, globalFieldFilters, schema ) results[item["identifier"]] = { "_guid_type": "discovery_metadata", @@ -464,24 +523,26 @@ def getRemoteDataAsJson(self, **kwargs) -> Dict: raise except httpx.HTTPError as exc: logger.error( - f"An HTTP error {exc.response.status_code if exc.response is not None else ''} occurred while requesting {exc.request.url}. Returning { len(results['results'])} results" + f"An HTTP error {exc.response.status_code if exc.response is not None else ''} occurred while requesting {exc.request.url}. Returning {len(results['results'])} results" ) break # need to break here as cannot be assured of leaving while loop except ValueError as exc: logger.error( - f"An error occurred while requesting {mds_url} {exc}. Returning { len(results['results'])} results." + f"An error occurred while requesting {mds_url} {exc}. Returning {len(results['results'])} results." ) break except Exception as exc: logger.error( - f"An error occurred while requesting {mds_url} {exc}. Returning { len(results['results'])} results." + f"An error occurred while requesting {mds_url} {exc}. Returning {len(results['results'])} results." ) break return results @staticmethod - def addGen3ExpectedFields(item, mappings, keepOriginalFields, globalFieldFilters): + def addGen3ExpectedFields( + item, mappings, keepOriginalFields, globalFieldFilters, schema + ): """ Map item fields to gen3 normalized fields using the mapping and adding the location @@ -489,7 +550,7 @@ def addGen3ExpectedFields(item, mappings, keepOriginalFields, globalFieldFilters results = item if mappings is not None: mapped_fields = RemoteMetadataAdapter.mapFields( - item, mappings, globalFieldFilters + item, mappings, globalFieldFilters, schema ) if keepOriginalFields: results.update(mapped_fields) @@ -517,13 +578,14 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]: mappings = kwargs.get("mappings", None) keepOriginalFields = kwargs.get("keepOriginalFields", True) globalFieldFilters = kwargs.get("globalFieldFilters", []) + schema = kwargs.get("schema", {}) results = {} for item in data["results"]: item = item["Study"] item = flatten(item) normalized_item = ClinicalTrials.addGen3ExpectedFields( - item, mappings, keepOriginalFields, globalFieldFilters + item, mappings, keepOriginalFields, globalFieldFilters, schema ) results[item["NCTId"]] = { "_guid_type": "discovery_metadata", @@ -577,13 +639,15 @@ def getRemoteDataAsJson(self, **kwargs) -> Dict: raise except httpx.HTTPError as exc: logger.error( - f"An HTTP error { exc.response.status_code if exc.response is not None else '' } occurred while requesting {exc.request.url}. Skipping {id}" + f"An HTTP error {exc.response.status_code if exc.response is not None else ''} occurred while requesting {exc.request.url}. Skipping {id}" ) return results @staticmethod - def addGen3ExpectedFields(item, mappings, keepOriginalFields, globalFieldFilters): + def addGen3ExpectedFields( + item, mappings, keepOriginalFields, globalFieldFilters, schema + ): """ Maps the items fields into Gen3 resources fields if keepOriginalFields is False: only those fields will be included in the final entry @@ -591,7 +655,7 @@ def addGen3ExpectedFields(item, mappings, keepOriginalFields, globalFieldFilters results = item if mappings is not None: mapped_fields = RemoteMetadataAdapter.mapFields( - item, mappings, globalFieldFilters + item, mappings, globalFieldFilters, schema ) if keepOriginalFields: results.update(mapped_fields) @@ -614,6 +678,7 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]: mappings = kwargs.get("mappings", None) keepOriginalFields = kwargs.get("keepOriginalFields", True) globalFieldFilters = kwargs.get("globalFieldFilters", []) + schema = kwargs.get("schema", {}) results = {} for item in data["results"]: @@ -622,7 +687,7 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]: if "id" in item: item["display_id"] = item["id"] normalized_item = PDAPS.addGen3ExpectedFields( - item, mappings, keepOriginalFields, globalFieldFilters + item, mappings, keepOriginalFields, globalFieldFilters, schema ) if "display_id" in item: results[item["display_id"]] = { @@ -721,7 +786,7 @@ def getRemoteDataAsJson(self, **kwargs) -> Tuple[Dict, str]: for var_iter, var in enumerate(vars): data_file["data_dictionary"].append( { - "name": var.get("@name", f"var{var_iter+1}"), + "name": var.get("@name", f"var{var_iter + 1}"), "label": var.get("labl", {}).get("#text"), "interval": var.get("@intrvl"), "type": var.get("varFormat", {}).get("@type"), @@ -737,17 +802,17 @@ def getRemoteDataAsJson(self, **kwargs) -> Tuple[Dict, str]: raise except httpx.HTTPError as exc: logger.error( - f"An HTTP error {exc.response.status_code if exc.response is not None else ''} occurred while requesting {exc.request.url}. Returning { len(results['results'])} results" + f"An HTTP error {exc.response.status_code if exc.response is not None else ''} occurred while requesting {exc.request.url}. Returning {len(results['results'])} results" ) break # need to break here as cannot be assured of leaving while loop except ValueError as exc: logger.error( - f"An error occurred while requesting {mds_url} {exc}. Returning { len(results['results'])} results." + f"An error occurred while requesting {mds_url} {exc}. Returning {len(results['results'])} results." ) break except Exception as exc: logger.error( - f"An error occurred while requesting {mds_url} {exc}. Returning { len(results['results'])} results." + f"An error occurred while requesting {mds_url} {exc}. Returning {len(results['results'])} results." ) break @@ -814,9 +879,9 @@ class Gen3Adapter(RemoteMetadataAdapter): """ @retry( - stop=stop_after_attempt(5), + stop=stop_after_attempt(10), retry=retry_if_exception_type(httpx.TimeoutException), - wait=wait_random_exponential(multiplier=1, max=20), + wait=wait_random_exponential(multiplier=1, max=60), before_sleep=before_sleep_log(logger, logging.DEBUG), ) def getRemoteDataAsJson(self, **kwargs) -> Dict: @@ -826,20 +891,27 @@ def getRemoteDataAsJson(self, **kwargs) -> Dict: if mds_url is None: return results + if mds_url[-1] != "/": + mds_url += "/" + config = kwargs.get("config", {}) guid_type = config.get("guid_type", "discovery_metadata") field_name = config.get("field_name", None) field_value = config.get("field_value", None) + filters = config.get("filters", None) batchSize = config.get("batchSize", 1000) maxItems = config.get("maxItems", None) offset = 0 limit = min(maxItems, batchSize) if maxItems is not None else batchSize moreData = True + # extend httpx timeout + # timeout = httpx.Timeout(connect=60, read=120, write=5, pool=60) while moreData: - url = f"{mds_url}mds/metadata?data=True&_guid_type={guid_type}&limit={limit}&offset={offset}" try: url = f"{mds_url}mds/metadata?data=True&_guid_type={guid_type}&limit={limit}&offset={offset}" + if filters: + url += f"&{filters}" if field_name is not None and field_value is not None: url += f"&{guid_type}.{field_name}={field_value}" response = httpx.get(url) @@ -849,16 +921,16 @@ def getRemoteDataAsJson(self, **kwargs) -> Dict: results["results"].update(data) numReturned = len(data) - if numReturned == 0 or numReturned < limit: + if numReturned == 0 or numReturned <= limit: moreData = False offset += numReturned - except httpx.TimeoutException as exc: + except httpx.TimeoutException: logger.error(f"An timeout error occurred while requesting {url}.") raise except httpx.HTTPError as exc: logger.error( - f"An HTTP error { exc.response.status_code if exc.response is not None else '' } occurred while requesting {exc.request.url}. Returning { len(results['results'])} results." + f"An HTTP error {exc if exc is not None else ''} occurred while requesting {exc.request.url}. Returning {len(results['results'])} results." ) break @@ -866,7 +938,7 @@ def getRemoteDataAsJson(self, **kwargs) -> Dict: @staticmethod def addGen3ExpectedFields( - item, mappings, keepOriginalFields, globalFieldFilters + item, mappings, keepOriginalFields, globalFieldFilters, schema ) -> Dict[str, Any]: """ Given an item (metadata as a dict), map the item's keys into @@ -881,7 +953,7 @@ def addGen3ExpectedFields( results = item if mappings is not None: mapped_fields = RemoteMetadataAdapter.mapFields( - item, mappings, globalFieldFilters + item, mappings, globalFieldFilters, schema ) if keepOriginalFields: results.update(mapped_fields) @@ -907,11 +979,19 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]: study_field = config.get("study_field", "gen3_discovery") keepOriginalFields = kwargs.get("keepOriginalFields", True) globalFieldFilters = kwargs.get("globalFieldFilters", []) + schema = kwargs.get("schema", {}) results = {} for guid, record in data["results"].items(): + if study_field not in record: + logger.error(f"Study field not in record. Skipping") + continue item = Gen3Adapter.addGen3ExpectedFields( - record[study_field], mappings, keepOriginalFields, globalFieldFilters + record[study_field], + mappings, + keepOriginalFields, + globalFieldFilters, + schema, ) results[guid] = { "_guid_type": "discovery_metadata", @@ -925,6 +1005,82 @@ def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]: return results +class DRSIndexdAdapter(RemoteMetadataAdapter): + """ + Pulls the DRS hostname from a ga4gh (indexd) server to cache + them to support local compact DRS resolution. + """ + + @staticmethod + def clean_dist_entry(s: str) -> str: + """ + Cleans the string returning a proper DRS prefix + @param s: string to clean + @return: cleaned string + """ + return s.replace("\\.", ".").replace(".*", "") + + @staticmethod + def clean_http_url(s: str) -> str: + """ + Cleans input string removing http(s) prefix and all trailing paths + @param s: string to clean + @return: cleaned string + """ + return ( + s.replace("/index", "")[::-1] + .replace("/", "", 1)[::-1] + .replace("http://", "") + .replace("https://", "") + .replace("/ga4gh/drs/v1/objects", "") + ) + + def getRemoteDataAsJson(self, **kwargs) -> Dict: + from datetime import datetime, timezone + + results = {"results": {}} + + mds_url = kwargs.get("mds_url", None) + if mds_url is None: + return results + + try: + response = httpx.get(f"{mds_url}/index/_dist") + response.raise_for_status() + data = response.json() + # process the entries and create a DRS cache + results = { + "info": { + "created": datetime.now(timezone.utc).strftime( + "%m/%d/%Y %H:%M:%S:%Z" + ) + }, + "cache": {}, + } + for entry in data: + if entry["type"] != "indexd": + continue + host = DRSIndexdAdapter.clean_http_url(entry["host"]) + name = entry.get("name", "") + for x in entry["hints"]: + drs_prefix = DRSIndexdAdapter.clean_dist_entry(x) + results["cache"][drs_prefix] = { + "host": host, + "name": name, + "type": entry["type"], + } + + except httpx.HTTPError as exc: + logger.error( + f"An HTTP error {exc.response.status_code if exc.response is not None else ''} occurred while requesting {exc.request.url}." + ) + + return results + + def normalizeToGen3MDSFields(self, data, **kwargs) -> Dict[str, Any]: + return data + + def gather_metadata( gather, mds_url, @@ -934,6 +1090,7 @@ def gather_metadata( perItemValues, keepOriginalFields, globalFieldFilters, + schema, ): try: json_data = gather.getRemoteDataAsJson( @@ -946,6 +1103,7 @@ def gather_metadata( perItemValues=perItemValues, keepOriginalFields=keepOriginalFields, globalFieldFilters=globalFieldFilters, + schema=schema, ) return results except ValueError as exc: @@ -961,6 +1119,7 @@ def gather_metadata( "pdaps": PDAPS, "mps": MPSAdapter, "gen3": Gen3Adapter, + "drs_indexd": DRSIndexdAdapter, "harvard_dataverse": HarvardDataverse, } @@ -974,6 +1133,7 @@ def get_metadata( perItemValues=None, keepOriginalFields=False, globalFieldFilters=None, + schema=None, ): if config is None: config = {} @@ -1000,4 +1160,5 @@ def get_metadata( perItemValues=perItemValues, keepOriginalFields=keepOriginalFields, globalFieldFilters=globalFieldFilters, + schema=schema, ) diff --git a/src/mds/agg_mds/commons.py b/src/mds/agg_mds/commons.py index b577fad4..69677a1e 100644 --- a/src/mds/agg_mds/commons.py +++ b/src/mds/agg_mds/commons.py @@ -1,7 +1,7 @@ from dataclasses import dataclass, field from dataclasses_json import dataclass_json -from typing import Any, Dict, List, Optional -from datetime import datetime +from typing import Any, Dict, List, Optional, Union, TypeVar +from mds import logger import json @@ -10,7 +10,7 @@ class ColumnsToFields: """ A more complex mapping object for mapping column names to MDS fields - allows to explictly mark a field as missing, a default value and it's resources type + allows to explicitly mark a field as missing, a default value and it's resources type """ name: str @@ -18,17 +18,195 @@ class ColumnsToFields: default: str = "" type: str = "string" + def get_value(self, info: dict): + return info.get(self.name, self.default) + + +@dataclass_json +@dataclass +class FieldAggregation: + """ + Provides a description of what fields to compute summary information. + The default assumes computing the sum of the field, assuming it is a number + the functions supported are: sum and count + """ + + type: str = "number" + function: str = "sum" + chart: str = "text" + + +FieldDefinition = TypeVar("FieldDefinition") + + +def string_to_array(s: str) -> Optional[List[str]]: + if s == "": + return [] + return [s] + + +def array_to_string(arr: Optional[list]) -> Optional[str]: + if arr is None: + logger.error(f"array is None") + return None + return "".join(arr) + + +def string_to_integer(s: str) -> int: + if not s.isnumeric(): + logger.error(f"{s} does not represent a number") + return int(s) if s.isnumeric() else None + + +def string_to_number(s: str) -> Optional[float]: + try: + return float(s) + except ValueError: + logger.error(f"{s} failed to convert to a float") + return None + + +def string_to_dict(s: str) -> Optional[Dict[Any, Any]]: + try: + return json.loads(s) + except json.JSONDecodeError: + logger.error(f"{s} failed to convert to JSON ") + return None + + +def dict_to_array(d: dict) -> List[Dict[Any, Any]]: + return [d] + + +@dataclass_json +@dataclass +class FieldDefinition: + """ + Provides a description of a field defined in the metadata + While other fields are defined dynamically, these help "tune" + certain fields + * type: one of string, number, object, nested (deeper object) + """ + + type: str = "string" + description: str = "" + default: Optional[Any] = None + properties: Optional[Dict[str, FieldDefinition]] = None + items: Optional[Dict[str, str]] = None + + ES_TYPE_MAPPING = { + "array": "nested", + "object": "nested", + "string": "keyword", + "integer": "long", + "number": "float", + } + + FIELD_NORMALIZATION = { + "string_to_number": string_to_number, + "string_to_integer": string_to_integer, + "string_to_object": string_to_dict, + "object_to_array": dict_to_array, + "string_to_array": string_to_array, + "array_to_string": array_to_string, + } + + MAP_TYPE_TO_JSON_SCHEMA_TYPES = { + "str": "string", + "int": "integer", + "list": "array", + "dict": "object", + } + + def has_default_value(self): + return self.default is not None + + def __post_init__(self): + if self.properties is not None: + self.properties = { + k: FieldDefinition.from_dict(v) for k, v in self.properties.items() + } + + def get_es_type(self): + field_type = FieldDefinition.ES_TYPE_MAPPING.get(self.type, self.type) + if self.type == "array" and self.items and self.items["type"] == "string": + field_type = "keyword" + + if field_type == "keyword": + return { + "type": field_type, + "fields": { + "analyzed": { + "type": "text", + "analyzer": "ngram_analyzer", + "search_analyzer": "search_analyzer", + "term_vector": "with_positions_offsets", + } + }, + } + + return {"type": field_type} + + def to_schema(self, es_types: bool = False, all_fields: bool = False): + """ + Maps the FieldDefinition to either a JSON schema or an Elasticsearch mapping + """ + res = self.get_es_type() if es_types else {"type": self.type} + if self.properties is not None: + res["properties"] = { + k: v.to_schema(es_types, all_fields) for k, v in self.properties.items() + } + if all_fields: + if self.items is not None: + res["items"] = self.items + if self.description is not None: + res["description"] = self.description + if self.default is not None: + res["default"] = self.default + return res + + def normalize_value(self, value) -> Any: + value_type = FieldDefinition.MAP_TYPE_TO_JSON_SCHEMA_TYPES.get( + type(value).__name__, type(value).__name__ + ) + + if value_type == self.type: + return value + + conversion = f"{value_type}_to_{self.type}" + converter = FieldDefinition.FIELD_NORMALIZATION.get(conversion, None) + if converter is None: + logger.warning( + f"warning normalizing {value} via converter {conversion} not applied." + ) + return value + return converter(value) + @dataclass_json @dataclass class MDSInstance: + """ + Handles pulling and processing data from a Gen3 metadata-service + """ + mds_url: str commons_url: str - columns_to_fields: Optional[Dict[str, Any]] = None + columns_to_fields: Optional[ + Union[Dict[str, str], Dict[str, ColumnsToFields]] + ] = None study_data_field: str = "gen3_discovery" guid_type: str = "discovery_metadata" select_field: Optional[Dict[str, str]] = None + def __post_init__(self): + if self.columns_to_fields is None: + return + + for name, value in self.columns_to_fields.items(): + if isinstance(value, dict): + self.columns_to_fields[name] = ColumnsToFields.from_dict(value) + @dataclass_json @dataclass @@ -43,27 +221,44 @@ class AdapterMDSInstance: study_data_field: str = "gen3_discovery" keep_original_fields: bool = True global_field_filters: List[str] = field(default_factory=list) + commons_name: Optional[str] = None + + +@dataclass_json +@dataclass +class Settings: + cache_drs: bool = False + drs_indexd_server: str = "https://dataguids.org" + timestamp_entry: bool = False + + +@dataclass_json +@dataclass +class Config: + settings: Optional[Settings] = field(default_factory=dict) + schema: Optional[Dict[str, FieldDefinition]] = field(default_factory=dict) + aggregations: Optional[Dict[str, FieldAggregation]] = field(default_factory=dict) + search_settings: Optional[Dict[str, FieldAggregation]] = field(default_factory=dict) @dataclass_json @dataclass class Commons: - gen3_commons: Dict[str, MDSInstance] - adapter_commons: Dict[str, AdapterMDSInstance] - aggregation: List[str] = field( - default_factory=lambda: ["_unique_id", "_subjects_count"] - ) + configuration: Optional[Config] = None + gen3_commons: Dict[str, MDSInstance] = field(default_factory=dict) + adapter_commons: Dict[str, AdapterMDSInstance] = field(default_factory=dict) + aggregations: Optional[Dict[str, FieldAggregation]] = field(default_factory=dict) + fields: Optional[Dict[str, FieldDefinition]] = field(default_factory=dict) + + def __post_init__(self): + if self.configuration is None: + self.configuration = Config(settings=Settings()) -def parse_config(data: Dict[str, Any]) -> Commons: +def parse_config(data: str) -> Commons: """ - parses a aggregated config which defines the list of MDS services and the mapping of field to column names + parses an aggregated config which defines the list of MDS services and the mapping of field to column names for the Ecosystem browser. Returns a dictionary of MDSInfo entries """ - return Commons.from_dict( - { - "gen3_commons": data.get("gen3_commons", dict()), - "adapter_commons": data.get("adapter_commons", dict()), - } - ) + return Commons.from_json(data) diff --git a/src/mds/agg_mds/datastore/__init__.py b/src/mds/agg_mds/datastore/__init__.py index a049d1e2..36c17b1f 100644 --- a/src/mds/agg_mds/datastore/__init__.py +++ b/src/mds/agg_mds/datastore/__init__.py @@ -13,8 +13,24 @@ async def init(hostname, port): await client.init(hostname, port) -async def drop_all(): - await client.drop_all() +async def drop_all_non_temp_indexes(): + await client.drop_all_non_temp_indexes() + + +async def drop_all_temp_indexes(): + await client.drop_all_temp_indexes() + + +async def create_indexes(commons_mapping): + await client.create_indexes(commons_mapping) + + +async def create_temp_indexes(commons_mapping): + await client.create_temp_indexes(commons_mapping) + + +async def clone_temp_indexes_to_real_indexes(): + await client.clone_temp_indexes_to_real_indexes() async def close(): @@ -32,10 +48,22 @@ async def update_metadata(*args): await client.update_metadata(*args) +async def update_global_info(*args): + await client.update_global_info(*args) + + +async def update_config_info(*args): + await client.update_config_info(*args) + + async def get_commons_metadata(*args): return await client.get_commons_metadata(*args) +async def get_all_tags(): + return await client.metadata_tags() + + async def get_all_named_commons_metadata(*args): return await client.get_all_named_commons_metadata(*args) @@ -54,11 +82,3 @@ async def get_commons(): async def get_all_metadata(*args): return await client.get_all_metadata(*args) - - -async def get_aggregations(*args): - return await client.get_aggregations(*args) - - -async def search(*args): - return await client.search(*args) diff --git a/src/mds/agg_mds/datastore/elasticsearch_dao.py b/src/mds/agg_mds/datastore/elasticsearch_dao.py index af32e014..cc2a8319 100644 --- a/src/mds/agg_mds/datastore/elasticsearch_dao.py +++ b/src/mds/agg_mds/datastore/elasticsearch_dao.py @@ -1,55 +1,64 @@ -from elasticsearch import Elasticsearch, exceptions as es_exceptions -from typing import List, Dict -import json +from elasticsearch import Elasticsearch, exceptions as es_exceptions, helpers +from typing import Any, List, Dict, Union, Optional, Tuple +from math import ceil from mds import logger -from mds.config import AGG_MDS_NAMESPACE -import pydash - - -# TODO WFH Why do we have both __manifest and _file_manifest? -# TODO WFH These are bugs. If we have to check whether an object is a string or -# an object, the data is bad. -FIELD_NORMALIZERS = { - "__manifest": "object", - "_file_manifest": "object", - "advSearchFilters": "object", - "data_dictionary": "object", - "sites": "number", - "budget_end": "date", - "date_added": "date", - "budget_start": "date", - "project_end_date": "date", - "award_notice_date": "date", - "project_start_date": "date", - "Data Availability.data_collection_finish_date": "date", - "Data Availability.data_collection_start_date": "date", - "Data Availability.data_release_finish_date": "date", - "Data Availability.data_release_start_date": "date", -} - +from mds.config import AGG_MDS_NAMESPACE, ES_RETRY_LIMIT, ES_RETRY_INTERVAL AGG_MDS_INDEX = f"{AGG_MDS_NAMESPACE}-commons-index" AGG_MDS_TYPE = "commons" - +AGG_MDS_INDEX_TEMP = f"{AGG_MDS_NAMESPACE}-commons-index-temp" AGG_MDS_INFO_INDEX = f"{AGG_MDS_NAMESPACE}-commons-info-index" AGG_MDS_INFO_TYPE = "commons-info" +AGG_MDS_INFO_INDEX_TEMP = f"{AGG_MDS_NAMESPACE}-commons-info-index-temp" +AGG_MDS_CONFIG_INDEX = f"{AGG_MDS_NAMESPACE}-commons-config-index" +AGG_MDS_CONFIG_TYPE = "commons-config" +AGG_MDS_CONFIG_INDEX_TEMP = f"{AGG_MDS_NAMESPACE}-commons-config-index-temp" -MAPPING = { +# Setting Commons Info ES index to only store documents +# will not be searching on it +INFO_MAPPING = { "mappings": { - "commons": { - "properties": { - "__manifest": { - "type": "nested", - }, - "tags": { - "type": "nested", + AGG_MDS_INFO_TYPE: { + "dynamic": False, + } + } +} + +CONFIG = { + "settings": {"index": {"number_of_shards": 1, "number_of_replicas": 0}}, + "mappings": {"_doc": {"properties": {"array": {"type": "keyword"}}}}, +} + +SEARCH_CONFIG = { + "settings": { + "index": { + "mapping.ignore_malformed": True, + "number_of_shards": 1, + "number_of_replicas": 0, + "analysis": { + "tokenizer": { + "ngram_tokenizer": { + "type": "ngram", + "min_gram": 3, + "max_gram": 3, + "token_chars": ["letter", "digit"], + } }, - "data_dictionary": { - "type": "nested", + "analyzer": { + "ngram_analyzer": { + "type": "custom", + "tokenizer": "ngram_tokenizer", + "filter": ["lowercase"], + }, + "search_analyzer": { + "type": "custom", + "tokenizer": "keyword", + "filter": "lowercase", + }, }, - } + }, } } } @@ -63,19 +72,43 @@ async def init(hostname: str = "0.0.0.0", port: int = 9200): [hostname], scheme="http", port=port, - timeout=30, - max_retries=7, + timeout=ES_RETRY_INTERVAL, + max_retries=ES_RETRY_LIMIT, retry_on_timeout=True, ) -async def drop_all(): - for index in [AGG_MDS_INDEX, AGG_MDS_INFO_INDEX]: +async def drop_all_non_temp_indexes(): + for index in [AGG_MDS_INDEX, AGG_MDS_INFO_INDEX, AGG_MDS_CONFIG_INDEX]: + res = elastic_search_client.indices.delete(index=index, ignore=[400, 404]) + logger.debug(f"deleted index: {index}: {res}") + + +async def drop_all_temp_indexes(): + for index in [ + AGG_MDS_INDEX_TEMP, + AGG_MDS_INFO_INDEX_TEMP, + AGG_MDS_CONFIG_INDEX_TEMP, + ]: res = elastic_search_client.indices.delete(index=index, ignore=[400, 404]) - logger.debug(f"deleted index: {index}") + logger.debug(f"deleted index: {index}: {res}") + +async def clone_temp_indexes_to_real_indexes(): + for index in [AGG_MDS_INDEX, AGG_MDS_INFO_INDEX, AGG_MDS_CONFIG_INDEX]: + source_index = index + "-temp" + reqBody = {"source": {"index": source_index}, "dest": {"index": index}} + logger.debug(f"Cloning index: {source_index} to {index}...") + res = Elasticsearch.reindex(elastic_search_client, reqBody) + # Elasticsearch >7.4 introduces the clone api we could use later on + # res = elastic_search_client.indices.clone(index=source_index, target=index) + logger.debug(f"Cloned index: {source_index} to {index}: {res}") + + +async def create_indexes(common_mapping: dict): try: - res = elastic_search_client.indices.create(index=AGG_MDS_INDEX, body=MAPPING) + mapping = {**SEARCH_CONFIG, **common_mapping} + res = elastic_search_client.indices.create(index=AGG_MDS_INDEX, body=mapping) logger.debug(f"created index {AGG_MDS_INDEX}: {res}") except es_exceptions.RequestError as ex: if ex.error == "resource_already_exists_exception": @@ -86,7 +119,7 @@ async def drop_all(): try: res = elastic_search_client.indices.create( - index=AGG_MDS_INFO_INDEX, + index=AGG_MDS_INFO_INDEX, body=INFO_MAPPING ) logger.debug(f"created index {AGG_MDS_INFO_INDEX}: {res}") @@ -97,23 +130,57 @@ async def drop_all(): else: # Other exception - raise it raise ex + try: + res = elastic_search_client.indices.create( + index=AGG_MDS_CONFIG_INDEX, body=CONFIG + ) + logger.debug(f"created index {AGG_MDS_CONFIG_INDEX}: {res}") + except es_exceptions.RequestError as ex: + if ex.error == "resource_already_exists_exception": + logger.warning(f"index already exists: {AGG_MDS_CONFIG_INDEX}") + pass # Index already exists. Ignore. + else: # Other exception - raise it + raise ex + -def normalize_field(doc, key, normalize_type): +async def create_temp_indexes(common_mapping: dict): try: - if normalize_type == "object" and isinstance(doc[key], str): - value = doc[key] - doc[key] = None if value == "" else json.loads(value) - if normalize_type == "number" and isinstance(pydash.get(doc, key), str): - pydash.set_(doc, key, None) - if ( - normalize_type == "date" - and isinstance(pydash.get(doc, key), str) - and pydash.get(doc, key) == "" - ): - pydash.set_(doc, key, None) - except Exception: - logger.debug(f"error normalizing {key} for a document") - doc[key] = None + mapping = {**SEARCH_CONFIG, **common_mapping} + res = elastic_search_client.indices.create( + index=AGG_MDS_INDEX_TEMP, body=mapping + ) + logger.debug(f"created index {AGG_MDS_INDEX_TEMP}: {res}") + except es_exceptions.RequestError as ex: + if ex.error == "resource_already_exists_exception": + logger.warning(f"index already exists: {AGG_MDS_INDEX_TEMP}") + pass # Index already exists. Ignore. + else: # Other exception - raise it + raise ex + + try: + res = elastic_search_client.indices.create( + index=AGG_MDS_INFO_INDEX_TEMP, body=INFO_MAPPING + ) + logger.debug(f"created index {AGG_MDS_INFO_INDEX_TEMP}: {res}") + + except es_exceptions.RequestError as ex: + if ex.error == "resource_already_exists_exception": + logger.warning(f"index already exists: {AGG_MDS_INFO_INDEX_TEMP}") + pass # Index already exists. Ignore. + else: # Other exception - raise it + raise ex + + try: + res = elastic_search_client.indices.create( + index=AGG_MDS_CONFIG_INDEX_TEMP, body=CONFIG + ) + logger.debug(f"created index {AGG_MDS_CONFIG_INDEX_TEMP}: {res}") + except es_exceptions.RequestError as ex: + if ex.error == "resource_already_exists_exception": + logger.warning(f"index already exists: {AGG_MDS_CONFIG_INDEX_TEMP}") + pass # Index already exists. Ignore. + else: # Other exception - raise it + raise ex async def update_metadata( @@ -123,26 +190,47 @@ async def update_metadata( tags: Dict[str, List[str]], info: Dict[str, str], study_data_field: str, + use_temp_index: bool = False, ): + index_to_update = AGG_MDS_INFO_INDEX_TEMP if use_temp_index else AGG_MDS_INFO_INDEX elastic_search_client.index( - index=AGG_MDS_INFO_INDEX, + index=index_to_update, doc_type=AGG_MDS_INFO_TYPE, id=name, body=info, ) + index_to_update = AGG_MDS_INDEX_TEMP if use_temp_index else AGG_MDS_INDEX for doc in data: key = list(doc.keys())[0] # Flatten out this structure doc = doc[key][study_data_field] - for field in FIELD_NORMALIZERS.keys(): - if pydash.has(doc, field): - normalize_field(doc, field, FIELD_NORMALIZERS[field]) + try: + elastic_search_client.index( + index=index_to_update, doc_type=AGG_MDS_TYPE, id=key, body=doc + ) + except Exception as ex: + raise (ex) + + +async def update_global_info(key, doc, use_temp_index: bool = False) -> None: + index_to_update = AGG_MDS_INFO_INDEX_TEMP if use_temp_index else AGG_MDS_INFO_INDEX + elastic_search_client.index( + index=index_to_update, doc_type=AGG_MDS_INFO_TYPE, id=key, body=doc + ) - elastic_search_client.index( - index=AGG_MDS_INDEX, doc_type=AGG_MDS_TYPE, id=key, body=doc - ) + +async def update_config_info(doc, use_temp_index: bool = False) -> None: + index_to_update = ( + AGG_MDS_CONFIG_INDEX_TEMP if use_temp_index else AGG_MDS_CONFIG_INDEX + ) + elastic_search_client.index( + index=index_to_update, + doc_type="_doc", + id=AGG_MDS_INDEX, + body=doc, + ) async def get_status(): @@ -174,27 +262,119 @@ async def get_commons(): return [] -async def get_all_metadata(limit, offset): +def count(value) -> Union[int, Any]: + """ + Returns the length of the value if list or dict otherwise returns the value + If value is None returns 0 + """ + if value is None: + return 0 + if isinstance(value, dict) or isinstance(value, list): + return len(value) + return value + + +def process_record(record: dict, counts: Optional[List[str]]) -> Tuple[str, dict]: + """ + processed an MDS record from the search + returns the id and record, if an entry in counts is found in the record the length is returned + instead of the entry. + """ + _id = record["_id"] + normalized = record["_source"] + for c in counts: + if c in normalized: + normalized[c] = count(normalized[c]) + return _id, normalized + + +async def get_all_metadata(limit, offset, counts: Optional[str] = None, flatten=False): + """ + Queries elastic search for metadata and returns up to the limit + offset: starting index to return + counts: converts the count of the entry[count] if it is a dict or array + returns: + + flattend == true + results : MDS results as a dict + paging info + + flattend == false + results : { + commonsA: metadata + commonsB: metadata + ... + }, + paging info + + The counts parameter provides a way to "compress" an array field to it's length. + For example: + if the record is: + {"count": [1, 2, 3, 4], "name": "my_name"} + then setting counts=count the result would be: + {"count": 4, "name": "my_name"} + + counts can take a comma separated list of field names: + { + "count": [1, 2, 3, 4], + "__manifest" : [ + { "filename": "filename1.txt", "filesize": 1000 }, + { "filename": "filename2.txt", "filesize": 5555 }, + ], + "name": "my_name" + } + + setting counts=count,__manifest the result would be: + { + "count": 4, + "__manifest" : 2, + "name": "my_name" + } + + if a counts field is not a list or dict then it is unchanged, unless it + is null, in which case the field will be set to 0 + """ try: res = elastic_search_client.search( index=AGG_MDS_INDEX, body={"size": limit, "from": offset, "query": {"match_all": {}}}, ) - byCommons = {} - for record in res["hits"]["hits"]: - id = record["_id"] - normalized = record["_source"] - commons_name = normalized["commons_name"] - if commons_name not in byCommons: - byCommons[commons_name] = [] - byCommons[commons_name].append( - { - id: { - "gen3_discovery": normalized, - } - } - ) - return byCommons + hitsTotal = res["hits"]["total"] + toReduce = counts.split(",") if counts is not None else None + if flatten: + flat = [] + for record in res["hits"]["hits"]: + rid, normalized = process_record(record, toReduce) + flat.append({rid: {"gen3_discovery": normalized}}) + return { + "results": flat, + "pagination": { + "hits": hitsTotal, + "offset": offset, + "pageSize": limit, + "pages": ceil(int(hitsTotal) / limit), + }, + } + else: + byCommons = { + "results": {}, + "pagination": { + "hits": hitsTotal, + "offset": offset, + "pageSize": limit, + "pages": ceil(int(hitsTotal) / limit), + }, + } + for record in res["hits"]["hits"]: + rid, normalized = process_record(record, toReduce) + commons_name = normalized["commons_name"] + if commons_name not in byCommons["results"]: + byCommons["results"][commons_name] = [] + byCommons["results"][commons_name].append( + {rid: {"gen3_discovery": normalized}} + ) + + return byCommons except Exception as error: logger.error(error) return {} @@ -212,9 +392,9 @@ async def get_all_named_commons_metadata(name): return {} -async def metadata_tags(name): +async def metadata_tags(): try: - return elastic_search_client.search( + res = elastic_search_client.search( index=AGG_MDS_INDEX, body={ "size": 0, @@ -237,12 +417,22 @@ async def metadata_tags(name): }, }, ) + results = {} + + for info in res["aggregations"]["tags"]["categories"]["buckets"]: + results[info["key"]] = { + "total": info["doc_count"], + "names": [{x["key"]: x["doc_count"] for x in info["name"]["buckets"]}], + } + + return results + except Exception as error: logger.error(error) return [] -async def get_commons_attribute(name, what): +async def get_commons_attribute(name): try: data = elastic_search_client.search( index=AGG_MDS_INFO_INDEX, diff --git a/src/mds/agg_mds/query.py b/src/mds/agg_mds/query.py index 23b500c9..365acc6c 100644 --- a/src/mds/agg_mds/query.py +++ b/src/mds/agg_mds/query.py @@ -2,49 +2,142 @@ from starlette.status import HTTP_404_NOT_FOUND from mds import config from mds.agg_mds import datastore - +from typing import Any, Dict, List +from pydantic import BaseModel mod = APIRouter() @mod.get("/aggregate/commons") async def get_commons(): - """ - Returns a list of all registered commons - :return: + """Returns a list of all commons with data in the aggregate metadata-service + + Example: + + { commons: ["commonsA", "commonsB" ] } + """ return await datastore.get_commons() +@mod.get("/aggregate/info/{what}") +async def get_commons_info(what: str): + """Returns status and configuration information about aggregate metadata service. + + Return configuration information. Currently supports only 1 information type: + **schema** + + Example: + + { + schema: { + ... + ... + } + } + + """ + res = await datastore.get_commons_attribute(what) + if res: + return res + else: + raise HTTPException( + HTTP_404_NOT_FOUND, + {"message": f"information for {what} not found", "code": 404}, + ) + + @mod.get("/aggregate/metadata") -async def metadata( +async def get_aggregate_metadata( _: Request, limit: int = Query( - 20, description="Maximum number of records returned. (max: 2000)" + 20, description="Maximum number of records returned. (e.g. max: 2000)" ), offset: int = Query(0, description="Return results at this given offset."), + counts: str = Query( + "", + description="Return count of a field instead of the value if field is an array\ + otherwise field is unchanged. If field is **null** will set field to **0**.\ + Multiple fields can be compressed by comma separating the field names:\ + **files,authors**", + ), + flatten: bool = Query( + False, description="Return the results without grouping items by commons." + ), + pagination: bool = Query( + False, description="If true will return a pagination object in the response" + ), ): - # TODO WFH How to properly return this? We think grouping by MDS is probably - # not ideal in reality. We already have commons_name in the results. - """ - Returns all metadata from all registered commons in the form: - { - "commonA" : { - ... Metadata - }, - "commonB" : { - ... Metadata - } - ... - } + """Returns metadata records + + Returns medata records namespaced by commons as a JSON object. + Example without pagination: + + { + "commonA" : { + ... Metadata + }, + "commonB" : { + ... Metadata + } + ... + } + + The pagination option adds a pagination object to the response: + + { + results: { + "commonA" : { + ... Metadata + }, + "commonB" : { + ... Metadata + } + ... + }, + "pagination": { + "hits": 64, + "offset": 0, + "pageSize": 20, + "pages": 4 + } + } + + The flatten option removes the commons' namespace so all results are a child or results: + + results: { + ... Metadata from commons A + ... Metadata from commons B + } + ... + }, + + + The counts options when applied to an array or dictionary will replace + the field value with its length. If the field values is None it will replace it with 0. + All other types will be unchanged. """ - return await datastore.get_all_metadata(limit, offset) + results = await datastore.get_all_metadata(limit, offset, counts, flatten) + if pagination is False: + return results.get("results", {}) + return results @mod.get("/aggregate/metadata/{name}") -async def metadata_name(name: str): - """ - Returns the all the metadata from the named commons. +async def get_aggregate_metadata_for_commons( + name: str = Query( + False, description="Return the results without grouping items by commons." + ) +): + """et all metadata records from a commons by name + + Returns an array containing all the metadata entries for a single commons. + There are no limit/offset parameters. + + Example: + + [ { id2: { name: "bear" } } , { id3: { name: "cat" } }] + """ res = await datastore.get_all_named_commons_metadata(name) if res: @@ -56,39 +149,53 @@ async def metadata_name(name: str): ) -@mod.get("/aggregate/metadata/{name}/tags") -async def metadata_tags(name: str): - """ - Returns the tags associated with the named commons. +@mod.get("/aggregate/tags") +async def get_aggregate_tags(): + """Returns aggregate category, name and counts across all commons + + Example: + + { + "Data Type": { + "total": 275, + "names": [ + { + "Genotype": 103, + "Clinical Phenotype": 100, + "DCC Harmonized": 24, + "WGS": 20, + "SNP/CNV Genotypes (NGS)": 6, + "RNA-Seq": 5, + "WXS": 5, + "Targeted-Capture": 3, + "miRNA-Seq": 3, + "CNV Genotypes": 2 + } + ] + } + } """ - res = await datastore.get_commons_attribute(name, "tags") + res = await datastore.get_all_tags() if res: return res else: raise HTTPException( HTTP_404_NOT_FOUND, - {"message": f"no common exists with the given: {name}", "code": 404}, + {"message": f"error retrieving tags from service", "code": 404}, ) @mod.get("/aggregate/metadata/{name}/info") -async def metadata_info(name: str): +async def get_aggregate_metadata_commons_info(name: str): """ Returns information from the named commons. - """ - res = await datastore.get_commons_attribute(name, "info") - if res: - return res - else: - raise HTTPException( - HTTP_404_NOT_FOUND, - {"message": f"no common exists with the given: {name}", "code": 404}, - ) + Example: -@mod.get("/aggregate/metadata/{name}/aggregations") -async def metadata_aggregations(name: str): - res = await datastore.get_aggregations(name) + { commons_url: "gen3.datacommons.io" } + + """ + res = await datastore.get_commons_attribute(name) if res: return res else: @@ -99,8 +206,13 @@ async def metadata_aggregations(name: str): @mod.get("/aggregate/metadata/guid/{guid:path}") -async def metadata_name_guid(guid: str): - """Get the metadata of the GUID in the named commons.""" +async def get_aggregate_metadata_guid(guid: str): + """Returns a metadata record by GUID + + Example: + + { id2: { name: "bear" } } + """ res = await datastore.get_by_guid(guid) if res: return res @@ -116,4 +228,4 @@ async def metadata_name_guid(guid: str): def init_app(app): if config.USE_AGG_MDS: - app.include_router(mod, tags=["Query"]) + app.include_router(mod, tags=["Aggregate"]) diff --git a/src/mds/config.py b/src/mds/config.py index 06b7d16e..a3d0c7d8 100644 --- a/src/mds/config.py +++ b/src/mds/config.py @@ -59,6 +59,9 @@ def __init__(self, value): DB_RETRY_LIMIT = config("DB_RETRY_LIMIT", cast=int, default=DB_CONNECT_RETRIES) DB_RETRY_INTERVAL = config("DB_RETRY_INTERVAL", cast=int, default=1) +# Elasticsearch +ES_RETRY_INTERVAL = config("ES_RETRY_INTERVAL", cast=int, default=20) +ES_RETRY_LIMIT = config("ES_RETRY_LIMIT", cast=int, default=5) # Authz string DEFAULT_AUTHZ_STR = config( diff --git a/src/mds/populate.py b/src/mds/populate.py index 1816dbcf..71b3f24f 100644 --- a/src/mds/populate.py +++ b/src/mds/populate.py @@ -1,15 +1,14 @@ import asyncio from argparse import Namespace -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from mds.agg_mds import datastore, adapters from mds.agg_mds.mds import pull_mds -from mds.agg_mds.commons import MDSInstance, AdapterMDSInstance, Commons, parse_config +from mds.agg_mds.commons import MDSInstance, ColumnsToFields, Commons, parse_config from mds import config, logger from pathlib import Path from urllib.parse import urlparse import argparse import sys -import json def parse_args(argv: List[str]) -> Namespace: @@ -20,15 +19,11 @@ def parse_args(argv: List[str]) -> Namespace: parser = argparse.ArgumentParser() parser.add_argument("--config", help="config file to use", type=str, required=True) - parser.add_argument( - "--hostname", help="hostname of server", type=str, default="localhost" - ) - parser.add_argument("--port", help="port of server", type=int, default=6379) known_args, unknown_args = parser.parse_known_args(argv) return known_args -async def populate_metadata(name: str, common, results): +async def populate_metadata(name: str, common, results, use_temp_index=False): mds_arr = [{k: v} for k, v in results.items()] total_items = len(mds_arr) @@ -58,16 +53,26 @@ def normalize(entry: dict) -> Any: for column, field in common.columns_to_fields.items(): if field == column: continue - if field in entry[common.study_data_field]: - entry[common.study_data_field][column] = entry[ - common.study_data_field - ][field] + if isinstance(field, ColumnsToFields): + entry[common.study_data_field][column] = field.get_value( + entry[common.study_data_field] + ) + else: + if field in entry[common.study_data_field]: + entry[common.study_data_field][column] = entry[ + common.study_data_field + ][field] return entry entry = normalize(entry) - # add the common field and url to the entry - entry[common.study_data_field]["commons_name"] = name + # add the common field, selecting the name or an override (i.e. commons_name) and url to the entry + + entry[common.study_data_field]["commons_name"] = ( + common.commons_name + if hasattr(common, "commons_name") and common.commons_name is not None + else name + ) # add to tags for t in entry[common.study_data_field].get("tags", {}): @@ -84,16 +89,58 @@ def normalize(entry: dict) -> Any: keys = list(results.keys()) info = {"commons_url": common.commons_url} + await datastore.update_metadata( - name, mds_arr, keys, tags, info, common.study_data_field + name, mds_arr, keys, tags, info, common.study_data_field, use_temp_index ) -async def main(commons_config: Commons, hostname: str, port: int) -> None: +async def populate_info(commons_config: Commons, use_temp_index=False) -> None: + agg_info = { + key: value.to_dict() for key, value in commons_config.aggregations.items() + } + await datastore.update_global_info("aggregations", agg_info, use_temp_index) + + if commons_config.configuration.schema: + json_schema = { + k: v.to_schema(all_fields=True) + for k, v in commons_config.configuration.schema.items() + } + await datastore.update_global_info("schema", json_schema, use_temp_index) + await populate_drs_info(commons_config, use_temp_index) + + +async def populate_drs_info(commons_config: Commons, use_temp_index=False) -> None: + if commons_config.configuration.settings.cache_drs: + server = commons_config.configuration.settings.drs_indexd_server + if server is not None: + drs_data = adapters.get_metadata("drs_indexd", server, None) + + for id, entry in drs_data.get("cache", {}).items(): + await datastore.update_global_info(id, entry, use_temp_index) + + +async def populate_config(commons_config: Commons, use_temp_index=False) -> None: + array_definition = { + "array": [ + field + for field, value in commons_config.configuration.schema.items() + if value.type == "array" + ] + } + + await datastore.update_config_info(array_definition, use_temp_index) + + +async def main(commons_config: Commons) -> None: """ Given a config structure, pull all metadata from each one in the config and cache into the following structure: { + "configuration" : { + schema: { dict of data schema for normalized fields } + settings: { dict of additional configuration properties } + }, "commons_name" : { "metadata" : [ array of metadata entries ], "field_mapping" : { dictionary of field_name to column_name }, @@ -106,36 +153,83 @@ async def main(commons_config: Commons, hostname: str, port: int) -> None: """ if not config.USE_AGG_MDS: - print("aggregate MDS disabled") + logger.info("aggregate MDS disabled") exit(1) url_parts = urlparse(config.ES_ENDPOINT) await datastore.init(hostname=url_parts.hostname, port=url_parts.port) - await datastore.drop_all() - - for name, common in commons_config.gen3_commons.items(): - logger.info(f"Populating {name} using Gen3 MDS connector") - results = pull_mds(common.mds_url, common.guid_type) - logger.info(f"Received {len(results)} from {name}") - if len(results) > 0: - await populate_metadata(name, common, results) - - for name, common in commons_config.adapter_commons.items(): - logger.info(f"Populating {name} using adapter: {common.adapter}") - results = adapters.get_metadata( - common.adapter, - common.mds_url, - common.filters, - common.config, - common.field_mappings, - common.per_item_values, - common.keep_original_fields, - common.global_field_filters, + + # build mapping table for commons index + + field_mapping = { + "mappings": { + "commons": { + "properties": { + k: v.to_schema(True) + for k, v in commons_config.configuration.schema.items() + } + } + } + } + + await datastore.drop_all_temp_indexes() + await datastore.create_temp_indexes(commons_mapping=field_mapping) + + mdsCount = 0 + try: + for name, common in commons_config.gen3_commons.items(): + logger.info(f"Populating {name} using Gen3 MDS connector") + results = pull_mds(common.mds_url, common.guid_type) + logger.info(f"Received {len(results)} from {name}") + if len(results) > 0: + mdsCount += len(results) + await populate_metadata(name, common, results, use_temp_index=True) + + for name, common in commons_config.adapter_commons.items(): + logger.info(f"Populating {name} using adapter: {common.adapter}") + results = adapters.get_metadata( + common.adapter, + common.mds_url, + common.filters, + common.config, + common.field_mappings, + common.per_item_values, + common.keep_original_fields, + common.global_field_filters, + schema=commons_config.configuration.schema, + ) + logger.info(f"Received {len(results)} from {name}") + if len(results) > 0: + mdsCount += len(results) + await populate_metadata(name, common, results, use_temp_index=True) + + if mdsCount == 0: + raise ValueError("Could not obtain any metadata from any adapters.") + + # populate global information index + await populate_info(commons_config, use_temp_index=True) + # populate array index information to support guppy + await populate_config(commons_config, use_temp_index=True) + + except Exception as ex: + logger.error( + "Error occurred during mds population. Existing indexes are left in place." ) - logger.info(f"Received {len(results)} from {name}") - if len(results) > 0: - await populate_metadata(name, common, results) + logger.error(ex) + raise ex + + logger.info(f"Temp indexes populated successfully. Proceeding to clone") + # All temp indexes created without error, drop current real index, clone temp to real index and then drop temp index + try: + await datastore.drop_all_non_temp_indexes() # TODO: rename indexes to old + await datastore.create_indexes(commons_mapping=field_mapping) + await datastore.clone_temp_indexes_to_real_indexes() + await datastore.drop_all_temp_indexes() + except Exception as ex: + logger.error("Error occurred during cloning.") + logger.error(ex) + raise ex res = await datastore.get_status() print(res) @@ -176,9 +270,15 @@ async def filter_entries( return filtered -def parse_config_from_file(path: Path) -> Commons: - with open(path, "rt") as infile: - return parse_config(json.load(infile)) +def parse_config_from_file(path: Path) -> Optional[Commons]: + if not path.exists(): + logger.error(f"configuration file: {path} does not exist") + return None + try: + return parse_config(path.read_text()) + except IOError as ex: + logger.error(f"cannot read configuration file {path}: {ex}") + raise ex if __name__ == "__main__": @@ -187,4 +287,4 @@ def parse_config_from_file(path: Path) -> Commons: """ args: Namespace = parse_args(sys.argv) commons = parse_config_from_file(Path(args.config)) - asyncio.run(main(commons_config=commons, hostname=args.hostname, port=args.port)) + asyncio.run(main(commons_config=commons)) diff --git a/tests/test_agg_mds_adapters.py b/tests/test_agg_mds_adapters.py index 2f12b584..6668123e 100644 --- a/tests/test_agg_mds_adapters.py +++ b/tests/test_agg_mds_adapters.py @@ -1,11 +1,37 @@ from more_itertools import side_effect import respx import json -from mds.agg_mds.adapters import get_metadata, get_json_path_value +from mds.agg_mds.adapters import ( + get_metadata, + get_json_path_value, + strip_email, + strip_html, + add_icpsr_source_url, + FieldFilters, + get_json_path_value, +) from tenacity import RetryError, wait_none import httpx +def test_filters_with_bad_entries(): + assert strip_email(100) == 100 + assert strip_html(99) == 99 + assert add_icpsr_source_url(77) == 77 + + +def test_non_existing_filters(): + assert FieldFilters().execute("nofilter", "passthru") == "passthru" + + +def test_json_path(): + assert get_json_path_value(None, {}) is None + assert get_json_path_value("shark", {"shark": ["great", "white"]}) == [ + "great", + "white", + ] + + @respx.mock def test_get_metadata_icpsr(): xml_response = """ @@ -323,6 +349,70 @@ def test_get_metadata_icpsr(): assert isinstance(exc, RetryError) == True +@respx.mock +def test_drs_indexd(): + json_data = [ + { + "hints": [".*dg\\.XXTS.*"], + "host": "https://mytest1.commons.io/", + "name": "DataSTAGE", + "type": "indexd", + }, + { + "hints": [".*dg\\.TSXX.*"], + "host": "https://commons2.io/index/", + "name": "Environmental DC", + "type": "indexd", + }, + ] + + expected = { + "info": {"created": "07/07/2022 15:28:46:UTC"}, + "cache": { + "dg.XXTS": { + "host": "mytest1.commons.io", + "name": "DataSTAGE", + "type": "indexd", + }, + "dg.TSXX": { + "host": "commons2.io", + "name": "Environmental DC", + "type": "indexd", + }, + }, + } + + respx.get("http://test/index/_dist").mock( + return_value=httpx.Response( + status_code=200, + json=json_data, + ) + ) + + results = get_metadata( + "drs_indexd", + "http://test", + filters=None, + ) + + assert results["cache"] == expected["cache"] + + respx.get("http://test/index/_dist").mock( + return_value=httpx.Response( + status_code=404, + json=None, + ) + ) + + results = get_metadata( + "drs_indexd", + "http://test", + filters=None, + ) + + assert results == {"results": {}} + + @respx.mock def test_get_metadata_clinicaltrials(): json_response = r"""{ @@ -3004,7 +3094,7 @@ def test_get_metadata_mps(): "authz": "", "sites": "", "summary": "path:description", - "study_url": "path:url", + "study_url": {"path": "url", "default": ""}, "location": "path:data_group", "subjects": "", "__manifest": "", @@ -3013,7 +3103,7 @@ def test_get_metadata_mps(): "institutions": "path:data_group", "year_awarded": "", "investigators": "path:data_group", - "project_title": "path:title", + "project_title": {"path": "title", "default": ""}, "protocol_name": "", "study_summary": "", "_file_manifest": "", @@ -3289,7 +3379,7 @@ def test_gen3_adapter(): "gen3_discovery": { "tags": [{"name": "Array", "category": "Data Type"}], "_subjects_count": 48, - "dbgap_accession_number": "", + "dbgap_accession_number": None, "study_description": "The molecular factors involved in the development of Post-traumatic Stress Disorder (PTSD) remain poorly understood. Previous transcriptomic studies investigating the mechanisms of PTSD apply targeted approaches to identify individual genes under a cross-sectional framework lack a holistic view of the behaviours and properties of these genes at the system-level. Here we sought to apply an unsupervised gene-network-based approach to a prospective experimental design using whole-transcriptome RNA-Seq gene expression from peripheral blood leukocytes of U.S. Marines (N=188), obtained both pre- and post-deployment to conflict zones. We identified discrete groups of co-regulated genes (i.e., co-expression modules) and tested them for association to PTSD. We identified one module at both pre- and post-deployment containing putative causal signatures for PTSD development displaying an over-expression of genes enriched for functions of innate-immune response and interferon signalling (Type-I and Type-II). Importantly, these results were replicated in a second non-overlapping independent dataset of U.S. Marines (N=96), further outlining the role of innate immune and interferon signalling genes within co-expression modules to explain at least part of the causal pathophysiology for PTSD development. A second module, consequential of trauma exposure, contained PTSD resiliency signatures and an over-expression of genes involved in hemostasis and wound responsiveness suggesting that chronic levels of stress impair proper wound healing during/after exposure to the battlefield while highlighting the role of the hemostatic system as a clinical indicator of chronic-based stress. These findings provide novel insights for early preventative measures and advanced PTSD detection, which may lead to interventions that delay or perhaps abrogate the development of PTSD.\nWe used microarrays to characterize both prognostic and diagnostic molecular signatures associated to PTSD risk and PTSD status compared to control subjects.", "number_of_datafiles": 0, "investigator": "me.foo@smartsite.com", @@ -3365,15 +3455,18 @@ def test_gen3_adapter(): per_item_override = {"GSE63878": {"dbgap_accession_number": "dg.333344.222"}} - get_metadata( - "gen3", - "http://test/ok/", - None, - config={"batchSize": 64}, - mappings=field_mappings, - keepOriginalFields=False, - perItemValues=per_item_override, - ) == expected + assert ( + get_metadata( + "gen3", + "http://test/ok/", + None, + config={"batchSize": 64}, + mappings=field_mappings, + keepOriginalFields=False, + perItemValues=per_item_override, + ) + == expected + ) respx.get( "http://test/error/mds/metadata?data=True&_guid_type=discovery_metadata&limit=1000&offset=0" @@ -3400,7 +3493,7 @@ def test_gen3_adapter(): get_metadata("gen3", "http://test/timeouterror/", None, field_mappings) except Exception as exc: - assert isinstance(exc, RetryError) == True + assert isinstance(exc, RetryError) @respx.mock @@ -3642,10 +3735,10 @@ def test_get_metadata_harvard_dataverse(): "originalFileName": "us_metro_confirmed_cases_cdl.csv", "UNF": "UNF:6:w715RbMgdXAjmDiwdGNv+g==", "rootDataFileId": -1, - "md5": "ef0d67774caa8f1bcd7bcce4e8d62396", + "md5": "ef0d6777", "checksum": { "type": "MD5", - "value": "ef0d67774caa8f1bcd7bcce4e8d62396" + "value": "ef0d67774" }, "creationDate": "2022-05-17" } @@ -3892,10 +3985,10 @@ def test_get_metadata_harvard_dataverse(): "originalFileName": "us_metro_confirmed_cases_cdl.csv", "UNF": "UNF:6:w715RbMgdXAjmDiwdGNv+g==", "rootDataFileId": -1, - "md5": "ef0d67774caa8f1bcd7bcce4e8d62396", + "md5": "ef0d6777", "checksum": { "type": "MD5", - "value": "ef0d67774caa8f1bcd7bcce4e8d62396" + "value": "ef0d6777" }, "creationDate": "2022-05-17" } @@ -4349,6 +4442,19 @@ def test_get_metadata_harvard_dataverse(): == {} ) + respx.get( + "http://test/ok/datasets/:persistentId/?persistentId=doi:10.7910/DVN/5B8YM8" + "http://test/ok/datasets/:persistentId/?persistentId=doi:10.7910/DVN/5B8YM8" + ).mock( + return_value=httpx.Response( + status_code=200, json=json.loads(dataset_json_response) + ) + ) + + respx.get("http://test/ok/access/datafile/6297263/metadata/ddi").mock( + return_value=httpx.Response(status_code=200, text=file_ddi_response) + ) + # valid call assert ( get_metadata( @@ -4445,7 +4551,7 @@ def test_get_metadata_harvard_dataverse(): mappings=field_mappings, ) except Exception as exc: - assert isinstance(exc, RetryError) == True + assert isinstance(exc, RetryError) is True def test_missing_adapter(): @@ -4476,10 +4582,10 @@ def test_json_path_expression(): ) # test non existent path - assert get_json_path_value("study2.study_description_summary", sample1) == "" + assert get_json_path_value("study2.study_description_summary", sample1) is None # test bad path - assert get_json_path_value(".contributors", sample1) == "" + assert get_json_path_value(".contributors", sample1) is None # test single array assert get_json_path_value("study1.contributors", sample1) == ["Bilbo Baggins"] diff --git a/tests/test_agg_mds_commons.py b/tests/test_agg_mds_commons.py index 7994b113..807d2d7d 100644 --- a/tests/test_agg_mds_commons.py +++ b/tests/test_agg_mds_commons.py @@ -5,14 +5,229 @@ from mds.agg_mds.commons import ( parse_config, Commons, + Config, + Settings, + ColumnsToFields, + FieldDefinition, MDSInstance, AdapterMDSInstance, ) +def test_convert_tp_schema(): + schema = FieldDefinition( + type="object", + properties={ + "_subjects_count": FieldDefinition(type="integer"), + "year_awarded": FieldDefinition(type="integer", default=2000), + "__manifest": FieldDefinition( + type="array", + properties={ + "file_name": FieldDefinition(type="string"), + "file_size": FieldDefinition(type="integer"), + }, + ), + "tags": FieldDefinition(type="array"), + "study_description": FieldDefinition(type="string"), + "short_name": FieldDefinition(type="string"), + "full_name": FieldDefinition(type="string"), + "_unique_id": FieldDefinition(type="string"), + "study_id": FieldDefinition(type="string"), + "study_url": FieldDefinition(type="string"), + "commons_url": FieldDefinition(type="string"), + "authz": FieldDefinition(type="string"), + }, + ) + + converted = schema.to_schema(True) + + expected = { + "properties": { + "__manifest": { + "properties": { + "file_name": { + "fields": { + "analyzed": { + "analyzer": "ngram_analyzer", + "search_analyzer": "search_analyzer", + "term_vector": "with_positions_offsets", + "type": "text", + } + }, + "type": "keyword", + }, + "file_size": {"type": "long"}, + }, + "type": "nested", + }, + "_subjects_count": {"type": "long"}, + "_unique_id": { + "fields": { + "analyzed": { + "analyzer": "ngram_analyzer", + "search_analyzer": "search_analyzer", + "term_vector": "with_positions_offsets", + "type": "text", + } + }, + "type": "keyword", + }, + "authz": { + "fields": { + "analyzed": { + "analyzer": "ngram_analyzer", + "search_analyzer": "search_analyzer", + "term_vector": "with_positions_offsets", + "type": "text", + } + }, + "type": "keyword", + }, + "commons_url": { + "fields": { + "analyzed": { + "analyzer": "ngram_analyzer", + "search_analyzer": "search_analyzer", + "term_vector": "with_positions_offsets", + "type": "text", + } + }, + "type": "keyword", + }, + "full_name": { + "fields": { + "analyzed": { + "analyzer": "ngram_analyzer", + "search_analyzer": "search_analyzer", + "term_vector": "with_positions_offsets", + "type": "text", + } + }, + "type": "keyword", + }, + "short_name": { + "fields": { + "analyzed": { + "analyzer": "ngram_analyzer", + "search_analyzer": "search_analyzer", + "term_vector": "with_positions_offsets", + "type": "text", + } + }, + "type": "keyword", + }, + "study_description": { + "fields": { + "analyzed": { + "analyzer": "ngram_analyzer", + "search_analyzer": "search_analyzer", + "term_vector": "with_positions_offsets", + "type": "text", + } + }, + "type": "keyword", + }, + "study_id": { + "fields": { + "analyzed": { + "analyzer": "ngram_analyzer", + "search_analyzer": "search_analyzer", + "term_vector": "with_positions_offsets", + "type": "text", + } + }, + "type": "keyword", + }, + "study_url": { + "fields": { + "analyzed": { + "analyzer": "ngram_analyzer", + "search_analyzer": "search_analyzer", + "term_vector": "with_positions_offsets", + "type": "text", + } + }, + "type": "keyword", + }, + "tags": {"type": "nested"}, + "year_awarded": { + "type": "long", + }, + }, + "type": "nested", + } + assert converted == expected + + converted = schema.to_schema(False, True) + + expected = { + "type": "object", + "properties": { + "_subjects_count": {"type": "integer", "description": ""}, + "year_awarded": {"type": "integer", "description": "", "default": 2000}, + "__manifest": { + "type": "array", + "properties": { + "file_name": {"type": "string", "description": ""}, + "file_size": {"type": "integer", "description": ""}, + }, + "description": "", + }, + "tags": {"type": "array", "description": ""}, + "study_description": {"type": "string", "description": ""}, + "short_name": {"type": "string", "description": ""}, + "full_name": {"type": "string", "description": ""}, + "_unique_id": {"type": "string", "description": ""}, + "study_id": {"type": "string", "description": ""}, + "study_url": {"type": "string", "description": ""}, + "commons_url": {"type": "string", "description": ""}, + "authz": {"type": "string", "description": ""}, + }, + "description": "", + } + + assert converted == expected + + def test_parse_config(): - assert parse_config( + results = parse_config( + """ { + "configuration": { + "schema": { + "_subjects_count": { + "type": "integer" + }, + "year_awarded": { + "type": "integer" + }, + "__manifest": { + "type": "array", + "properties": { + "file_name": { + "type": "string" + }, + "file_size": { + "type": "integer" + } + } + }, + "tags": { + "type": "array" + }, + "study_description": {}, + "short_name": {}, + "full_name": {}, + "_unique_id": {}, + "study_id": {}, + "study_url": {}, + "commons_url": {}, + "authz": { + "type": "string" + } + } + }, + "gen3_commons": { "my_gen3_commons": { "mds_url": "http://mds", @@ -23,19 +238,44 @@ def test_parse_config(): "_subjects_count": "_subjects_count", "study_id": "study_id", "_unique_id": "_unique_id", - "study_description": "study_description", - }, + "study_description": "study_description" + } } }, "adapter_commons": { "non_gen3_commons": { "mds_url": "http://non-gen3", "commons_url": "non-gen3", - "adapter": "icpsr", + "adapter": "icpsr" } - }, + } } - ) == Commons( + """ + ) + expected = Commons( + configuration=Config( + settings=Settings(), + schema={ + "_subjects_count": FieldDefinition(type="integer"), + "year_awarded": FieldDefinition(type="integer"), + "__manifest": FieldDefinition( + type="array", + properties={ + "file_name": FieldDefinition(type="string"), + "file_size": FieldDefinition(type="integer"), + }, + ), + "tags": FieldDefinition(type="array"), + "study_description": FieldDefinition(type="string"), + "short_name": FieldDefinition(type="string"), + "full_name": FieldDefinition(type="string"), + "_unique_id": FieldDefinition(type="string"), + "study_id": FieldDefinition(type="string"), + "study_url": FieldDefinition(type="string"), + "commons_url": FieldDefinition(type="string"), + "authz": FieldDefinition(type="string"), + }, + ), gen3_commons={ "my_gen3_commons": MDSInstance( "http://mds", @@ -58,3 +298,70 @@ def test_parse_config(): ) }, ) + + assert expected == results + + +def test_normalization(): + val = FieldDefinition(type="integer") + assert val.normalize_value("100") == 100 + assert val.normalize_value("bear") is None + + val = FieldDefinition(type="number") + assert val.normalize_value("1.23") == 1.23 + assert val.normalize_value("bear") is None + + val = FieldDefinition(type="array") + assert val.normalize_value("1.23") == ["1.23"] + assert val.normalize_value({"foo": "bar"}) == [{"foo": "bar"}] + + val = FieldDefinition(type="array") + assert val.normalize_value(None) is None + + val = FieldDefinition(type="array") + assert val.normalize_value("") == [] + + val = FieldDefinition(type="object") + assert val.normalize_value('{"foo" : "bar"}') == {"foo": "bar"} + + val = FieldDefinition(type="object") + assert val.normalize_value("bear") is None + + val = FieldDefinition(type="string") + assert val.normalize_value("hello") == "hello" + + val = FieldDefinition(type="bar") + assert val.normalize_value("hello") == "hello" + + val = FieldDefinition(type="string") + val.has_default_value() is False + + val = FieldDefinition(type="string", default="hi") + val.has_default_value() == "hi" + + val = FieldDefinition(type="string") + assert val.normalize_value(["hello", "how", "are", "you"]) == "hellohowareyou" + assert val.normalize_value(None) is None + + +def test_mds_instance(): + val = MDSInstance( + mds_url="https://test", + commons_url="http:/commons.io", + ) + assert val.columns_to_fields is None + + val = MDSInstance( + mds_url="https://test", + commons_url="http:/commons.io", + columns_to_fields={ + "val1": "path:root", + "value2": {"name": "value1", "default": "bear"}, + }, + ) + + assert val.columns_to_fields is not None + + val = ColumnsToFields(name="test", default="bear") + assert val.get_value({"test": "fox"}) == "fox" + assert val.get_value({"b": "s"}) == "bear" diff --git a/tests/test_agg_mds_datastore.py b/tests/test_agg_mds_datastore.py index 215a9b97..411d4320 100644 --- a/tests/test_agg_mds_datastore.py +++ b/tests/test_agg_mds_datastore.py @@ -16,10 +16,38 @@ async def test_init(): @pytest.mark.asyncio -async def test_drop_all(): +async def test_drop_all_non_temp_indexes(): with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_client: - await datastore.drop_all() - mock_client.drop_all.assert_called_with() + await datastore.drop_all_non_temp_indexes() + mock_client.drop_all_non_temp_indexes.assert_called_with() + + +@pytest.mark.asyncio +async def test_drop_all_temp_indexes(): + with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_client: + await datastore.drop_all_temp_indexes() + mock_client.drop_all_temp_indexes.assert_called_with() + + +@pytest.mark.asyncio +async def test_create_indexes(): + with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_client: + await datastore.create_indexes("{}") + mock_client.create_indexes.assert_called_with("{}") + + +@pytest.mark.asyncio +async def test_create_temp_indexes(): + with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_client: + await datastore.create_temp_indexes("{}") + mock_client.create_temp_indexes.assert_called_with("{}") + + +@pytest.mark.asyncio +async def test_clone_temp_indexes_to_real_indexes(): + with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_client: + await datastore.clone_temp_indexes_to_real_indexes() + mock_client.clone_temp_indexes_to_real_indexes.assert_called_with() @pytest.mark.asyncio @@ -43,6 +71,20 @@ async def test_update_metadata(): mock_client.update_metadata.assert_called_with() +@pytest.mark.asyncio +async def test_update_global_info(): + with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_client: + await datastore.update_global_info() + mock_client.update_global_info.assert_called_with() + + +@pytest.mark.asyncio +async def test_update_config_info(): + with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_client: + await datastore.update_config_info() + mock_client.update_config_info.assert_called_with() + + @pytest.mark.asyncio async def test_get_commons_metadata(): with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_client: @@ -57,6 +99,13 @@ async def test_get_all_named_commons_metadata(): mock_client.get_all_named_commons_metadata.assert_called_with() +@pytest.mark.asyncio +async def test_get_all_tags(): + with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_client: + await datastore.get_all_tags() + mock_client.metadata_tags.assert_called_with() + + @pytest.mark.asyncio async def test_get_by_guid(): with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_client: diff --git a/tests/test_agg_mds_elasticsearch_dao.py b/tests/test_agg_mds_elasticsearch_dao.py index c87e407c..e5637f21 100644 --- a/tests/test_agg_mds_elasticsearch_dao.py +++ b/tests/test_agg_mds_elasticsearch_dao.py @@ -1,8 +1,40 @@ from unittest.mock import patch, call, MagicMock import pytest from mds.agg_mds.datastore import elasticsearch_dao -from mds.agg_mds.datastore.elasticsearch_dao import MAPPING -from elasticsearch import Elasticsearch, exceptions as es_exceptions +from mds.agg_mds.datastore.elasticsearch_dao import ( + INFO_MAPPING, + AGG_MDS_INDEX, + AGG_MDS_INFO_INDEX, + AGG_MDS_CONFIG_INDEX, + CONFIG, + SEARCH_CONFIG, + AGG_MDS_INDEX_TEMP, + AGG_MDS_INFO_INDEX_TEMP, + AGG_MDS_CONFIG_INDEX_TEMP, + AGG_MDS_INFO_TYPE, + count, + process_record, +) +from elasticsearch import exceptions as es_exceptions +from mds.config import ES_RETRY_LIMIT, ES_RETRY_INTERVAL + +COMMON_MAPPING = { + "mappings": { + "commons": { + "properties": { + "__manifest": { + "type": "nested", + }, + "tags": { + "type": "nested", + }, + "data_dictionary": { + "type": "nested", + }, + } + } + } +} @pytest.mark.asyncio @@ -15,29 +47,127 @@ async def test_init(): ["myhost"], port=9200, scheme="http", - timeout=30, - max_retries=7, + timeout=ES_RETRY_INTERVAL, + max_retries=ES_RETRY_LIMIT, retry_on_timeout=True, ) @pytest.mark.asyncio -async def test_drop_all(): +async def test_drop_all_non_temp_indexes(): + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.indices", + MagicMock(), + ) as mock_indices: + await elasticsearch_dao.drop_all_non_temp_indexes() + mock_indices.delete.assert_has_calls( + [ + call(index=AGG_MDS_INDEX, ignore=[400, 404]), + call(index=AGG_MDS_INFO_INDEX, ignore=[400, 404]), + call(index=AGG_MDS_CONFIG_INDEX, ignore=[400, 404]), + ], + any_order=True, + ) + + +@pytest.mark.asyncio +async def test_drop_all_temp_indexes(): with patch( "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.indices", MagicMock(), ) as mock_indices: - await elasticsearch_dao.drop_all() + await elasticsearch_dao.drop_all_temp_indexes() mock_indices.delete.assert_has_calls( [ - call(index="default_namespace-commons-index", ignore=[400, 404]), - call(index="default_namespace-commons-info-index", ignore=[400, 404]), - ] + call(index=AGG_MDS_INDEX_TEMP, ignore=[400, 404]), + call(index=AGG_MDS_INFO_INDEX_TEMP, ignore=[400, 404]), + call(index=AGG_MDS_CONFIG_INDEX_TEMP, ignore=[400, 404]), + ], + any_order=True, + ) + + +@pytest.mark.asyncio +async def test_clone_temp_indexes_to_real_indexes(): + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.Elasticsearch", + MagicMock(), + ) as mock_es: + await elasticsearch_dao.clone_temp_indexes_to_real_indexes() + mock_es.reindex.assert_has_calls( + [ + call( + elasticsearch_dao.elastic_search_client, + { + "source": {"index": AGG_MDS_INDEX_TEMP}, + "dest": {"index": AGG_MDS_INDEX}, + }, + ), + call( + elasticsearch_dao.elastic_search_client, + { + "source": {"index": AGG_MDS_INFO_INDEX_TEMP}, + "dest": {"index": AGG_MDS_INFO_INDEX}, + }, + ), + call( + elasticsearch_dao.elastic_search_client, + { + "source": {"index": AGG_MDS_CONFIG_INDEX_TEMP}, + "dest": {"index": AGG_MDS_CONFIG_INDEX}, + }, + ), + ], + any_order=True, ) + + +@pytest.mark.asyncio +async def test_create_indexes(): + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.indices", + MagicMock(), + ) as mock_indices: + await elasticsearch_dao.create_indexes(common_mapping=COMMON_MAPPING) + mock_indices.create.assert_has_calls( + [ + call(body={**SEARCH_CONFIG, **COMMON_MAPPING}, index=AGG_MDS_INDEX), + call(body=INFO_MAPPING, index=AGG_MDS_INFO_INDEX), + call(body=CONFIG, index=AGG_MDS_CONFIG_INDEX), + ], + any_order=True, + ) + + +@pytest.mark.asyncio +async def test_create_indexes(): + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.indices", + MagicMock(), + ) as mock_indices: + await elasticsearch_dao.create_indexes(common_mapping=COMMON_MAPPING) mock_indices.create.assert_has_calls( [ - call(body=MAPPING, index="default_namespace-commons-index"), - call(index="default_namespace-commons-info-index"), + call(body={**SEARCH_CONFIG, **COMMON_MAPPING}, index=AGG_MDS_INDEX), + call(body=INFO_MAPPING, index=AGG_MDS_INFO_INDEX), + call(body=CONFIG, index=AGG_MDS_CONFIG_INDEX), + ], + any_order=True, + ) + + +@pytest.mark.asyncio +async def test_create_temp_indexes(): + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.indices", + MagicMock(), + ) as mock_indices: + await elasticsearch_dao.create_temp_indexes(common_mapping=COMMON_MAPPING) + mock_indices.create.assert_has_calls( + [ + call(body={**SEARCH_CONFIG, **COMMON_MAPPING}, index=AGG_MDS_INDEX_TEMP), + call(body=INFO_MAPPING, index=AGG_MDS_INFO_INDEX_TEMP), + call(body=CONFIG, index=AGG_MDS_CONFIG_INDEX_TEMP), ], any_order=True, ) @@ -52,21 +182,21 @@ async def test_create_if_exists(): 400, "resource_already_exists_exception" ) ), - ) as mock_indices: - await elasticsearch_dao.drop_all() + ): + await elasticsearch_dao.drop_all_non_temp_indexes() + await elasticsearch_dao.create_indexes(COMMON_MAPPING) @pytest.mark.asyncio async def test_create_index_raise_exception(): - with patch( "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.indices.create", MagicMock(side_effect=es_exceptions.RequestError(403, "expect_to_fail")), - ) as mock_indices: + ): try: - await elasticsearch_dao.drop_all() + await elasticsearch_dao.create_indexes(common_mapping=COMMON_MAPPING) except Exception as exc: - assert isinstance(exc, es_exceptions.RequestError) == True + assert isinstance(exc, es_exceptions.RequestError) is True @pytest.mark.asyncio @@ -82,7 +212,7 @@ async def test_update_metadata(): "my_id": { "gen3_discovery": { "some_field": "some_value", - "__manifest": "{}", + "__manifest": {}, "sites": "", } } @@ -96,18 +226,117 @@ async def test_update_metadata(): mock_index.assert_has_calls( [ call( - index="default_namespace-commons-info-index", + body={}, doc_type="commons-info", id="my_commons", + index=AGG_MDS_INFO_INDEX, + ), + call( + body={"some_field": "some_value", "__manifest": {}, "sites": ""}, + doc_type="commons", + id="my_id", + index=AGG_MDS_INDEX, + ), + ], + any_order=True, + ) + + +@pytest.mark.asyncio +async def test_update_metadata_to_temp_index(): + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.index", + MagicMock(), + ) as mock_index: + await elasticsearch_dao.update_metadata( + "my_commons", + [ + { + "my_id": { + "gen3_discovery": { + "some_field": "some_value", + "__manifest": {}, + "sites": "", + } + } + } + ], + [], + {}, + {}, + "gen3_discovery", + use_temp_index=True, + ) + mock_index.assert_has_calls( + [ + call( body={}, + doc_type="commons-info", + id="my_commons", + index=AGG_MDS_INFO_INDEX_TEMP, ), call( - index="default_namespace-commons-index", + body={"some_field": "some_value", "__manifest": {}, "sites": ""}, doc_type="commons", id="my_id", - body={"some_field": "some_value", "__manifest": {}, "sites": None}, + index=AGG_MDS_INDEX_TEMP, ), - ] + ], + any_order=True, + ) + + +@pytest.mark.asyncio +async def test_update_global_info(): + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client", + MagicMock(), + ) as mock_client: + await elasticsearch_dao.update_global_info(key="123", doc={}) + + mock_client.index.assert_called_with( + index=AGG_MDS_INFO_INDEX, doc_type=AGG_MDS_INFO_TYPE, id="123", body={} + ) + + +@pytest.mark.asyncio +async def test_update_global_info_to_temp_index(): + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client", + MagicMock(), + ) as mock_client: + await elasticsearch_dao.update_global_info( + key="123", doc={}, use_temp_index=True + ) + + mock_client.index.assert_called_with( + index=AGG_MDS_INFO_INDEX_TEMP, doc_type=AGG_MDS_INFO_TYPE, id="123", body={} + ) + + +@pytest.mark.asyncio +async def test_update_config_info(): + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client", + MagicMock(), + ) as mock_client: + await elasticsearch_dao.update_config_info(doc={}) + + mock_client.index.assert_called_with( + index=AGG_MDS_CONFIG_INDEX, doc_type="_doc", id=AGG_MDS_INDEX, body={} + ) + + +@pytest.mark.asyncio +async def test_update_config_info_to_temp_index(): + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client", + MagicMock(), + ) as mock_client: + await elasticsearch_dao.update_config_info(doc={}, use_temp_index=True) + + mock_client.index.assert_called_with( + index=AGG_MDS_CONFIG_INDEX_TEMP, doc_type="_doc", id=AGG_MDS_INDEX, body={} ) @@ -122,6 +351,11 @@ async def test_get_status(): mock_client.ping.assert_called_with() +@pytest.mark.asyncio +async def close(): + assert True + + @pytest.mark.asyncio async def test_get_commons(): with patch( @@ -130,7 +364,7 @@ async def test_get_commons(): ) as mock_search: await elasticsearch_dao.get_commons() mock_search.assert_called_with( - index="default_namespace-commons-index", + index=AGG_MDS_INDEX, body={ "size": 0, "aggs": {"commons_names": {"terms": {"field": "commons_name.keyword"}}}, @@ -140,10 +374,44 @@ async def test_get_commons(): with patch( "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.search", MagicMock(side_effect=Exception("some error")), - ) as mock_search: + ): assert await elasticsearch_dao.get_commons() == [] +def test_count_dict(): + assert count({1: 2, 3: 4}) == 2 + + +def test_count_list(): + assert count([1, 2, 3]) == 3 + + +def test_count_value_number(): + assert count(123) == 123 + + +def test_count_value_string(): + assert count("imastring") == "imastring" + + +def test_count_value_none(): + assert count(None) == 0 + + +def test_process_records(): + _id = "123" + _source = {"count": [1, 2, 3, 4], "name": "my_name"} + record = {"_id": _id, "_source": _source} + rid, normalized = process_record(record, ["count"]) + assert rid == _id + assert normalized == {"count": 4, "name": "my_name"} + + # test if passed dict field is not array + rid, normalized = process_record(record, ["name"]) + assert rid == _id + assert normalized == _source + + @pytest.mark.asyncio async def test_get_all_metadata(): response = { @@ -156,14 +424,14 @@ async def test_get_all_metadata(): ) as mock_search: await elasticsearch_dao.get_all_metadata(5, 9) mock_search.assert_called_with( - index="default_namespace-commons-index", + index=AGG_MDS_INDEX, body={"size": 5, "from": 9, "query": {"match_all": {}}}, ) with patch( "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.search", MagicMock(side_effect=Exception("some error")), - ) as mock_search: + ): assert await elasticsearch_dao.get_all_metadata(5, 9) == {} @@ -174,14 +442,14 @@ async def test_get_all_named_commons_metadata(): ) as mock_client: await elasticsearch_dao.get_all_named_commons_metadata("my-commons") mock_client.search.assert_called_with( - index="default_namespace-commons-index", + index=AGG_MDS_INDEX, body={"query": {"match": {"commons_name.keyword": "my-commons"}}}, ) with patch( "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.search", MagicMock(side_effect=Exception("some error")), - ) as mock_search: + ): assert ( await elasticsearch_dao.get_all_named_commons_metadata("my-commons") == {} ) @@ -192,9 +460,9 @@ async def test_metadata_tags(): with patch( "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client", MagicMock() ) as mock_client: - await elasticsearch_dao.metadata_tags("my-commons") + await elasticsearch_dao.metadata_tags() mock_client.search.assert_called_with( - index="default_namespace-commons-index", + index=AGG_MDS_INDEX, body={ "size": 0, "aggs": { @@ -216,8 +484,8 @@ async def test_metadata_tags(): with patch( "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.search", MagicMock(side_effect=Exception("some error")), - ) as mock_search: - assert await elasticsearch_dao.metadata_tags("my-commons") == [] + ): + assert await elasticsearch_dao.metadata_tags() == [] @pytest.mark.asyncio @@ -225,20 +493,17 @@ async def test_get_commons_attribute(): with patch( "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client", MagicMock() ) as mock_client: - await elasticsearch_dao.get_commons_attribute("my-commons", "attribute") + await elasticsearch_dao.get_commons_attribute("my-commons") mock_client.search.assert_called_with( - index="default_namespace-commons-info-index", + index=AGG_MDS_INFO_INDEX, body={"query": {"terms": {"_id": ["my-commons"]}}}, ) with patch( "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.search", MagicMock(side_effect=Exception("some error")), - ) as mock_search: - assert ( - await elasticsearch_dao.get_commons_attribute("my-commons", "attribute") - == None - ) + ): + assert await elasticsearch_dao.get_commons_attribute("my-commons") is None @pytest.mark.asyncio @@ -248,7 +513,7 @@ async def test_get_aggregations(): ) as mock_client: await elasticsearch_dao.get_aggregations("my-commons") mock_client.search.assert_called_with( - index="default_namespace-commons-index", + index=AGG_MDS_INDEX, body={ "size": 0, "query": { @@ -263,7 +528,7 @@ async def test_get_aggregations(): with patch( "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.search", MagicMock(side_effect=Exception("some error")), - ) as mock_search: + ): assert await elasticsearch_dao.get_aggregations("my-commons") == [] @@ -274,7 +539,7 @@ async def test_get_by_guid(): ) as mock_client: await elasticsearch_dao.get_by_guid("my-commons") mock_client.get.assert_called_with( - index="default_namespace-commons-index", + index=AGG_MDS_INDEX, doc_type="commons", id="my-commons", ) @@ -282,5 +547,5 @@ async def test_get_by_guid(): with patch( "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.get", MagicMock(side_effect=Exception("some error")), - ) as mock_get: - assert await elasticsearch_dao.get_by_guid("my-commons") == None + ): + assert await elasticsearch_dao.get_by_guid("my-commons") is None diff --git a/tests/test_agg_mds_query.py b/tests/test_agg_mds_query.py index 51b7c381..1e34d75d 100644 --- a/tests/test_agg_mds_query.py +++ b/tests/test_agg_mds_query.py @@ -2,10 +2,9 @@ import pytest import nest_asyncio from mds.agg_mds import datastore -from unittest.mock import patch +from unittest.mock import patch, MagicMock from conftest import AsyncMock - # https://github.com/encode/starlette/issues/440 nest_asyncio.apply() @@ -34,24 +33,26 @@ async def test_aggregate_commons(client): @pytest.mark.asyncio async def test_aggregate_metadata(client): with patch.object( - datastore, "get_all_metadata", AsyncMock(return_value=[]) + datastore, "get_all_metadata", AsyncMock(return_value={"results": []}) ) as datastore_mock: resp = client.get("/aggregate/metadata") assert resp.status_code == 200 assert resp.json() == [] - datastore.get_all_metadata.assert_called_with(20, 0) + datastore.get_all_metadata.assert_called_with(20, 0, "", False) mock_data = { - "commons1": [ - { - "study1": {}, - } - ], - "commons2": [ - { - "study2": {}, - } - ], + "results": { + "commons1": [ + { + "study1": {}, + } + ], + "commons2": [ + { + "study2": {}, + } + ], + } } with patch.object( @@ -59,8 +60,299 @@ async def test_aggregate_metadata(client): ) as datastore_mock: resp = client.get("/aggregate/metadata") assert resp.status_code == 200 + assert resp.json() == mock_data["results"] + datastore.get_all_metadata.assert_called_with(20, 0, "", False) + + +@pytest.mark.asyncio +async def test_aggregate_metadata_paged(client): + with patch.object( + datastore, "get_all_metadata", AsyncMock(return_value={"results": []}) + ) as datastore_mock: + resp = client.get("/aggregate/metadata?pagination=1&flatten=1") + assert resp.status_code == 200 + assert resp.json() == {"results": []} + datastore.get_all_metadata.assert_called_with(20, 0, "", True) + + mock_data = { + "results": [ + {"study1": {}}, + {"study2": {}}, + ], + "pagination": {"hits": 64, "offset": 0, "pageSize": 20, "pages": 4}, + } + + with patch.object( + datastore, "get_all_metadata", AsyncMock(return_value=mock_data) + ) as datastore_mock: + resp = client.get("/aggregate/metadata?pagination=1&flatten=1") + assert resp.status_code == 200 assert resp.json() == mock_data - datastore.get_all_metadata.assert_called_with(20, 0) + datastore.get_all_metadata.assert_called_with(20, 0, "", True) + + +@pytest.mark.asyncio +async def test_aggregate_metadata_paged_flat(client): + mock_data = { + "took": 3, + "timed_out": "false", + "_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0}, + "hits": { + "total": 161, + "max_score": 1.0, + "hits": [ + { + "_index": "default_namespace-commons-index", + "_type": "commons", + "_id": "815616c0-dfsdfjjj", + "_score": 1.0, + "_source": { + "link": "", + "tags": [ + {"name": "restricted", "category": "Access"}, + {"name": "genomic", "category": "category"}, + ], + "commons": "LI", + "_unique_id": "815616c0-c4a4-4883-9107-a05694499a36", + "dataset_code": "LI", + "brief_summary": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.", + "dataset_title": "Lorem ipsum dolor sit amet", + "samples_count": "", + "subjects_count": "", + "data_files_count": 11062, + "_subjects_count": "", + "study_description": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ultricies tristique nulla aliquet enim tortor at auctor.", + "short_name": "Lorem ipsum dolor sit amet", + "full_name": "Lorem ipsum dolor sit amet, consectetur adipiscing elit", + "commons_name": "Lorem ipsum", + "__manifest": [ + {"filename": "foo2.txt"}, + {"filename": "foo3.txt"}, + ], + }, + } + ], + }, + } + + results = { + "pagination": {"hits": 161, "offset": 0, "pageSize": 20, "pages": 9}, + "results": [ + { + "815616c0-dfsdfjjj": { + "gen3_discovery": { + "link": "", + "tags": [ + {"name": "restricted", "category": "Access"}, + {"name": "genomic", "category": "category"}, + ], + "commons": "LI", + "_unique_id": "815616c0-c4a4-4883-9107-a05694499a36", + "dataset_code": "LI", + "brief_summary": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.", + "dataset_title": "Lorem ipsum dolor sit amet", + "samples_count": "", + "subjects_count": "", + "data_files_count": 11062, + "_subjects_count": "", + "study_description": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ultricies tristique nulla aliquet enim tortor at auctor.", + "short_name": "Lorem ipsum dolor sit amet", + "full_name": "Lorem ipsum dolor sit amet, consectetur adipiscing elit", + "commons_name": "Lorem ipsum", + "__manifest": [ + {"filename": "foo2.txt"}, + {"filename": "foo3.txt"}, + ], + } + } + } + ], + } + + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.search", + MagicMock(return_value=mock_data), + ) as search: + resp = client.get("/aggregate/metadata?flatten=true&pagination=true") + assert resp.status_code == 200 + assert resp.json() == results + + +@pytest.mark.asyncio +async def test_aggregate_metadata_counts(client): + mock_data = { + "took": 3, + "timed_out": "false", + "_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0}, + "hits": { + "total": 161, + "max_score": 1.0, + "hits": [ + { + "_index": "default_namespace-commons-index", + "_type": "commons", + "_id": "815616c0-dfsdfjjj", + "_score": 1.0, + "_source": { + "link": "", + "tags": [ + {"name": "restricted", "category": "Access"}, + {"name": "genomic", "category": "category"}, + ], + "commons": "LI", + "_unique_id": "815616c0-c4a4-4883-9107-a05694499a36", + "dataset_code": "LI", + "brief_summary": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.", + "dataset_title": "Lorem ipsum dolor sit amet", + "samples_count": "", + "subjects_count": "", + "data_files_count": 11062, + "_subjects_count": "", + "study_description": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ultricies tristique nulla aliquet enim tortor at auctor.", + "short_name": "Lorem ipsum dolor sit amet", + "full_name": "Lorem ipsum dolor sit amet, consectetur adipiscing elit", + "commons_name": "Lorem ipsum", + "__manifest": [ + {"filename": "foo2.txt"}, + {"filename": "foo3.txt"}, + ], + }, + } + ], + }, + } + + results = { + "Lorem ipsum": [ + { + "815616c0-dfsdfjjj": { + "gen3_discovery": { + "link": "", + "tags": [ + {"name": "restricted", "category": "Access"}, + {"name": "genomic", "category": "category"}, + ], + "commons": "LI", + "_unique_id": "815616c0-c4a4-4883-9107-a05694499a36", + "dataset_code": "LI", + "brief_summary": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.", + "dataset_title": "Lorem ipsum dolor sit amet", + "samples_count": "", + "subjects_count": "", + "data_files_count": 11062, + "_subjects_count": "", + "study_description": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ultricies tristique nulla aliquet enim tortor at auctor.", + "short_name": "Lorem ipsum dolor sit amet", + "full_name": "Lorem ipsum dolor sit amet, consectetur adipiscing elit", + "commons_name": "Lorem ipsum", + "__manifest": 2, + } + } + } + ] + } + + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.search", + MagicMock(return_value=mock_data), + ) as search: + resp = client.get("/aggregate/metadata?counts=__manifest") + assert resp.status_code == 200 + assert resp.json() == results + + # test multiple counts field + mock_data["hits"]["hits"][0]["_source"]["__manifest"] = [ + {"filename": "foo2.txt"}, + {"filename": "foo3.txt"}, + ] + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.search", + MagicMock(return_value=mock_data), + ) as search: + results["Lorem ipsum"][0]["815616c0-dfsdfjjj"]["gen3_discovery"]["tags"] = 2 + resp = client.get("/aggregate/metadata?counts=__manifest,tags") + assert resp.status_code == 200 + assert resp.json() == results + + +@pytest.mark.asyncio +async def test_aggregate_metadata_counts_null(client): + mock_data = { + "took": 3, + "timed_out": "false", + "_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0}, + "hits": { + "total": 161, + "max_score": 1.0, + "hits": [ + { + "_index": "default_namespace-commons-index", + "_type": "commons", + "_id": "815616c0-dfsdfjjj", + "_score": 1.0, + "_source": { + "link": "", + "tags": [ + {"name": "restricted", "category": "Access"}, + {"name": "genomic", "category": "category"}, + ], + "commons": "LI", + "_unique_id": "815616c0-c4a4-4883-9107-a05694499a36", + "dataset_code": "LI", + "brief_summary": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.", + "dataset_title": "Lorem ipsum dolor sit amet", + "samples_count": "", + "subjects_count": "", + "data_files_count": 11062, + "_subjects_count": "", + "study_description": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ultricies tristique nulla aliquet enim tortor at auctor.", + "short_name": "Lorem ipsum dolor sit amet", + "full_name": "Lorem ipsum dolor sit amet, consectetur adipiscing elit", + "commons_name": "Lorem ipsum", + "__manifest": None, + }, + } + ], + }, + } + + results = { + "Lorem ipsum": [ + { + "815616c0-dfsdfjjj": { + "gen3_discovery": { + "link": "", + "tags": [ + {"name": "restricted", "category": "Access"}, + {"name": "genomic", "category": "category"}, + ], + "commons": "LI", + "_unique_id": "815616c0-c4a4-4883-9107-a05694499a36", + "dataset_code": "LI", + "brief_summary": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.", + "dataset_title": "Lorem ipsum dolor sit amet", + "samples_count": "", + "subjects_count": "", + "data_files_count": 11062, + "_subjects_count": "", + "study_description": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ultricies tristique nulla aliquet enim tortor at auctor.", + "short_name": "Lorem ipsum dolor sit amet", + "full_name": "Lorem ipsum dolor sit amet, consectetur adipiscing elit", + "commons_name": "Lorem ipsum", + "__manifest": 0, + } + } + } + ] + } + + with patch( + "mds.agg_mds.datastore.elasticsearch_dao.elastic_search_client.search", + MagicMock(return_value=mock_data), + ) as search: + resp = client.get("/aggregate/metadata?counts=__manifest") + assert resp.status_code == 200 + assert resp.json() == results @pytest.mark.asyncio @@ -92,24 +384,39 @@ async def test_aggregate_metadata_name(client): @pytest.mark.asyncio async def test_aggregate_metadata_tags(client): with patch.object( - datastore, "get_commons_attribute", AsyncMock(return_value=None) + datastore, "get_all_tags", AsyncMock(return_value={}) ) as datastore_mock: - resp = client.get("/aggregate/metadata/commons1/tags") + resp = client.get("/aggregate/tags") assert resp.status_code == 404 assert resp.json() == { "detail": { "code": 404, - "message": "no common exists with the given: commons1", + "message": "error retrieving tags from service", } } + tags = { + "Access": {"total": 63, "names": [{"restricted": 63}]}, + "Category": { + "total": 61, + "names": [ + { + "Family/Twin/Trios": 39, + "Prospective Longitudinal Cohort": 10, + "Tumor vs. Matched-Normal": 9, + "Cross-Sectional": 3, + } + ], + }, + } + with patch.object( - datastore, "get_commons_attribute", AsyncMock(return_value=["mytag1"]) + datastore, "get_all_tags", AsyncMock(return_value=tags) ) as datastore_mock: - resp = client.get("/aggregate/metadata/commons1/tags") + resp = client.get("/aggregate/tags") assert resp.status_code == 200 - assert resp.json() == ["mytag1"] - datastore.get_commons_attribute.assert_called_with("commons1", "tags") + assert resp.json() == tags + datastore.get_all_tags.assert_called_with() @pytest.mark.asyncio @@ -125,7 +432,7 @@ async def test_aggregate_metadata_info(client): "message": "no common exists with the given: commons1", } } - datastore.get_commons_attribute.assert_called_with("commons1", "info") + datastore.get_commons_attribute.assert_called_with("commons1") with patch.object( datastore, @@ -135,23 +442,7 @@ async def test_aggregate_metadata_info(client): resp = client.get("/aggregate/metadata/commons1/info") assert resp.status_code == 200 assert resp.json() == {"commons_url": "http://commons"} - datastore.get_commons_attribute.assert_called_with("commons1", "info") - - -@pytest.mark.asyncio -async def test_metadata_aggregations(client): - with patch.object( - datastore, "get_aggregations", AsyncMock(return_value=None) - ) as datastore_mock: - resp = client.get("/aggregate/metadata/commons1/aggregations") - assert resp.status_code == 404 - assert resp.json() == { - "detail": { - "code": 404, - "message": "no common exists with the given: commons1", - } - } - datastore.get_aggregations.assert_called_with("commons1") + datastore.get_commons_attribute.assert_called_with("commons1") @pytest.mark.asyncio @@ -176,3 +467,40 @@ async def test_aggregate_metadata_name_guid(client): assert resp.status_code == 200 assert resp.json() == {"study2": {}} datastore.get_by_guid.assert_called_with("123") + + +@pytest.mark.asyncio +async def test_aggregate_metadata_get_schema(client): + schema = { + "_subjects_count": {"type": "integer", "description": ""}, + "year_awarded": {"type": "integer", "description": ""}, + } + with patch.object( + datastore, + "get_commons_attribute", + AsyncMock( + return_value={ + "_subjects_count": {"type": "integer", "description": ""}, + "year_awarded": {"type": "integer", "description": ""}, + } + ), + ): + resp = client.get("/aggregate/info/schema") + assert resp.status_code == 200 + assert resp.json() == schema + datastore.get_commons_attribute.assert_called_with("schema") + + with patch.object( + datastore, + "get_commons_attribute", + AsyncMock(return_value=None), + ) as datastore_mock: + # test for unknown info string + resp = client.get("/aggregate/info/nothing") + assert resp.status_code == 404 + assert resp.json() == { + "detail": { + "code": 404, + "message": "information for nothing not found", + } + } diff --git a/tests/test_populate.py b/tests/test_populate.py index a81ce3fc..cf666570 100644 --- a/tests/test_populate.py +++ b/tests/test_populate.py @@ -1,4 +1,6 @@ import pytest +import respx +import httpx from argparse import Namespace from mds.populate import ( parse_config_from_file, @@ -6,12 +8,22 @@ populate_metadata, main, filter_entries, + populate_info, + populate_drs_info, + populate_config, +) +from mds.agg_mds.commons import ( + AdapterMDSInstance, + MDSInstance, + Commons, + Settings, + FieldDefinition, + Config, ) -from mds.agg_mds.commons import AdapterMDSInstance, MDSInstance, Commons from mds.agg_mds import adapters from mds.agg_mds import datastore import json -from unittest.mock import patch, MagicMock +from unittest.mock import patch, call, MagicMock from conftest import AsyncMock from tempfile import NamedTemporaryFile from pathlib import Path @@ -25,16 +37,7 @@ async def test_parse_args(): assert exception.code == 2 known_args = parse_args(["--config", "some/file.json"]) - assert known_args == Namespace( - config="some/file.json", hostname="localhost", port=6379 - ) - - known_args = parse_args( - ["--config", "some/file.json", "--hostname", "server", "--port", "1000"] - ) - assert known_args == Namespace( - config="some/file.json", hostname="server", port=1000 - ) + assert known_args == Namespace(config="some/file.json") @pytest.mark.asyncio @@ -79,33 +82,318 @@ async def test_populate_metadata(): {"my_category": ["my_name"]}, {"commons_url": "http://commons"}, "gen3_discovery", + False, ) @pytest.mark.asyncio -async def test_main(): +async def test_populate_info(): + with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_datastore: + with NamedTemporaryFile(mode="w+", delete=False) as fp: + json.dump( + { + "configuration": { + "schema": { + "_subjects_count": {"type": "integer"}, + "study_description": {}, + }, + }, + "gen3_commons": { + "mycommons": { + "mds_url": "http://mds", + "commons_url": "http://commons", + "columns_to_fields": { + "short_name": "name", + "full_name": "full_name", + "_subjects_count": "_subjects_count", + "study_id": "study_id", + "_unique_id": "_unique_id", + "study_description": "study_description", + }, + }, + }, + "adapter_commons": { + "non-gen3": { + "mds_url": "http://non-gen3", + "commons_url": "non-gen3", + "adapter": "icpsr", + } + }, + }, + fp, + ) + config = parse_config_from_file(Path(fp.name)) + await populate_info(config) + mock_datastore.update_global_info.assert_has_calls( + [ + call("aggregations", {}, False), + call( + "schema", + { + "_subjects_count": {"type": "integer", "description": ""}, + "study_description": {"type": "string", "description": ""}, + }, + False, + ), + ], + any_order=True, + ) + + +@pytest.mark.asyncio +@respx.mock +async def test_populate_drs_info(): + mock_adapter = AsyncMock(return_value={}) + patch("mds.agg_mds.adapters.get_metadata", mock_adapter) + with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_datastore: + with NamedTemporaryFile(mode="w+", delete=False) as fp: + json.dump( + { + "configuration": { + "schema": { + "_subjects_count": {"type": "integer"}, + "study_description": {}, + }, + "settings": { + "cache_drs": True, + "drs_indexd_server": "http://test", + "timestamp_entry": True, + }, + }, + }, + fp, + ) + + json_data = [ + { + "hints": [".*dg\\.XXTS.*"], + "host": "https://mytest1.commons.io/", + "name": "DataSTAGE", + "type": "indexd", + }, + { + "hints": [".*dg\\.TSXX.*"], + "host": "https://commons2.io/index/", + "name": "Environmental DC", + "type": "indexd", + }, + ] + + respx.get("http://test/index/_dist").mock( + return_value=httpx.Response( + status_code=200, + json=json_data, + ) + ) + + config = parse_config_from_file(Path(fp.name)) + await populate_drs_info(config) + mock_datastore.update_global_info.assert_has_calls( + [ + call( + "dg.XXTS", + { + "host": "mytest1.commons.io", + "name": "DataSTAGE", + "type": "indexd", + }, + False, + ), + call( + "dg.TSXX", + { + "host": "commons2.io", + "name": "Environmental DC", + "type": "indexd", + }, + False, + ), + ], + any_order=True, + ) + + await populate_drs_info(config, True) + mock_datastore.update_global_info.assert_has_calls( + [ + call( + "dg.XXTS", + { + "host": "mytest1.commons.io", + "name": "DataSTAGE", + "type": "indexd", + }, + True, + ), + call( + "dg.TSXX", + { + "host": "commons2.io", + "name": "Environmental DC", + "type": "indexd", + }, + True, + ), + ], + any_order=True, + ) + + +@pytest.mark.asyncio +async def test_populate_config(): + with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_datastore: + with NamedTemporaryFile(mode="w+", delete=False) as fp: + json.dump( + { + "configuration": { + "schema": { + "_subjects_count": {"type": "array"}, + "study_description": {}, + }, + }, + "gen3_commons": { + "mycommons": { + "mds_url": "http://mds", + "commons_url": "http://commons", + "columns_to_fields": { + "short_name": "name", + "full_name": "full_name", + "_subjects_count": "_subjects_count", + "study_id": "study_id", + "_unique_id": "_unique_id", + "study_description": "study_description", + }, + }, + }, + "adapter_commons": { + "non-gen3": { + "mds_url": "http://non-gen3", + "commons_url": "non-gen3", + "adapter": "icpsr", + } + }, + }, + fp, + ) + config = parse_config_from_file(Path(fp.name)) + await populate_config(config) + mock_datastore.update_config_info.called_with(["_subjects_count"]) + + +@pytest.mark.asyncio +async def test_populate_config_to_temp_index(): + with patch("mds.agg_mds.datastore.client", AsyncMock()) as mock_datastore: + with NamedTemporaryFile(mode="w+", delete=False) as fp: + json.dump( + { + "configuration": { + "schema": { + "_subjects_count": {"type": "array"}, + "study_description": {}, + }, + }, + "gen3_commons": { + "mycommons": { + "mds_url": "http://mds", + "commons_url": "http://commons", + "columns_to_fields": { + "short_name": "name", + "full_name": "full_name", + "_subjects_count": "_subjects_count", + "study_id": "study_id", + "_unique_id": "_unique_id", + "study_description": "study_description", + }, + }, + }, + "adapter_commons": { + "non-gen3": { + "mds_url": "http://non-gen3", + "commons_url": "non-gen3", + "adapter": "icpsr", + } + }, + }, + fp, + ) + config = parse_config_from_file(Path(fp.name)) + await populate_config(config, True) + mock_datastore.update_config_info.called_with( + ["_subjects_count"], use_temp_index=True + ) + + +@respx.mock +@pytest.mark.asyncio +async def test_populate_main(): with patch("mds.config.USE_AGG_MDS", False): with pytest.raises(SystemExit) as pytest_wrapped_e: - await main(None, "", 0) + await main(commons_config=None) assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.value.code == 1 patch("mds.config.USE_AGG_MDS", True).start() - patch("mds.populate.pull_mds", MagicMock()).start() patch.object(datastore, "init", AsyncMock()).start() - patch.object(datastore, "drop_all", AsyncMock()).start() + patch.object(datastore, "drop_all_non_temp_indexes", AsyncMock()).start() + patch.object(datastore, "drop_all_temp_indexes", AsyncMock()).start() + patch.object(datastore, "create_indexes", AsyncMock()).start() + patch.object(datastore, "create_temp_indexes", AsyncMock()).start() + patch.object(datastore, "update_config_info", AsyncMock()).start() patch.object(datastore, "get_status", AsyncMock(return_value="OK")).start() patch.object(datastore, "close", AsyncMock()).start() + patch.object(datastore, "update_global_info", AsyncMock()).start() patch.object(datastore, "update_metadata", AsyncMock()).start() patch.object(adapters, "get_metadata", MagicMock()).start() + patch.object(datastore, "clone_temp_indexes_to_real_indexes", AsyncMock()).start() + + json_response = { + "GSE63878": { + "_guid_type": "discovery_metadata", + "gen3_discovery": { + "link": "https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc=GSE63878", + "tags": [{"name": "Array", "category": "Data Type"}], + "source": "Ichan School of Medicine at Mount Sinai", + "funding": "", + "study_description_summary": "The molecular factors involved in the development of Post-traumatic Stress Disorder (PTSD) remain poorly understood. Previous transcriptomic studies investigating the mechanisms of PTSD apply targeted approaches to identify individual genes under a cross-sectional framework lack a holistic view of the behaviours and properties of these genes at the system-level. Here we sought to apply an unsupervised gene-network-based approach to a prospective experimental design using whole-transcriptome RNA-Seq gene expression from peripheral blood leukocytes of U.S. Marines (N=188), obtained both pre- and post-deployment to conflict zones. We identified discrete groups of co-regulated genes (i.e., co-expression modules) and tested them for association to PTSD. We identified one module at both pre- and post-deployment containing putative causal signatures for PTSD development displaying an over-expression of genes enriched for functions of innate-immune response and interferon signalling (Type-I and Type-II). Importantly, these results were replicated in a second non-overlapping independent dataset of U.S. Marines (N=96), further outlining the role of innate immune and interferon signalling genes within co-expression modules to explain at least part of the causal pathophysiology for PTSD development. A second module, consequential of trauma exposure, contained PTSD resiliency signatures and an over-expression of genes involved in hemostasis and wound responsiveness suggesting that chronic levels of stress impair proper wound healing during/after exposure to the battlefield while highlighting the role of the hemostatic system as a clinical indicator of chronic-based stress. These findings provide novel insights for early preventative measures and advanced PTSD detection, which may lead to interventions that delay or perhaps abrogate the development of PTSD.\nWe used microarrays to characterize both prognostic and diagnostic molecular signatures associated to PTSD risk and PTSD status compared to control subjects.", + "study_title": "Gene Networks Specific for Innate Immunity Define Post-traumatic Stress Disorder [Affymetrix]", + "subjects_count": 48, + "accession_number": "GSE63878", + "data_files_count": 0, + "contributor": "me.foo@smartsite.com", + }, + } + } + + respx.get( + "http://test/ok//mds/metadata?data=True&_guid_type=discovery_metadata&limit=1000&offset=0" + ).mock( + return_value=httpx.Response( + status_code=200, + json=json_response, + ) + ) await main( Commons( + configuration=Config( + settings=Settings(), + schema={ + "_subjects_count": FieldDefinition(type="integer"), + "year_awarded": FieldDefinition(type="integer"), + }, + ), gen3_commons={ "my_commons": MDSInstance( - mds_url="", - commons_url="", - columns_to_fields={}, + mds_url="http://test/ok/", + commons_url="test", + columns_to_fields={ + "authz": "path:authz", + "tags": "path:tags", + "_subjects_count": "path:subjects_count", + "dbgap_accession_number": "path:study_id", + "study_description": "path:study_description_summary", + "number_of_datafiles": "path:data_files_count", + "investigator": "path:contributor", + }, ), }, adapter_commons={ @@ -115,10 +403,151 @@ async def test_main(): adapter="icpsr", ), }, - ), - "", - 0, + ) + ) + + +@respx.mock +@pytest.mark.asyncio +async def test_populate_main_fail(): + patch("mds.config.USE_AGG_MDS", True).start() + patch.object(datastore, "init", AsyncMock()).start() + patch.object(datastore, "drop_all_temp_indexes", AsyncMock()).start() + patch.object(datastore, "create_indexes", AsyncMock()).start() + patch.object(datastore, "create_temp_indexes", AsyncMock()).start() + patch.object(datastore, "update_config_info", AsyncMock()).start() + patch.object(datastore, "get_status", AsyncMock(return_value="OK")).start() + patch.object(datastore, "close", AsyncMock()).start() + patch.object(datastore, "update_global_info", AsyncMock()).start() + patch.object(datastore, "update_metadata", AsyncMock()).start() + patch.object(adapters, "get_metadata", MagicMock()).start() + patch.object(datastore, "clone_temp_indexes_to_real_indexes", AsyncMock()).start() + + existing_metadata = { + "GSE63878": { + "_guid_type": "discovery_metadata", + "gen3_discovery": { + "link": "https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc=GSE63878", + "tags": [{"name": "Array", "category": "Data Type"}], + "source": "Ichan School of Medicine at Mount Sinai", + "funding": "", + "study_description_summary": "The molecular factors involved in the development of Post-traumatic Stress Disorder (PTSD) remain poorly understood. Previous transcriptomic studies investigating the mechanisms of PTSD apply targeted approaches to identify individual genes under a cross-sectional framework lack a holistic view of the behaviours and properties of these genes at the system-level. Here we sought to apply an unsupervised gene-network-based approach to a prospective experimental design using whole-transcriptome RNA-Seq gene expression from peripheral blood leukocytes of U.S. Marines (N=188), obtained both pre- and post-deployment to conflict zones. We identified discrete groups of co-regulated genes (i.e., co-expression modules) and tested them for association to PTSD. We identified one module at both pre- and post-deployment containing putative causal signatures for PTSD development displaying an over-expression of genes enriched for functions of innate-immune response and interferon signalling (Type-I and Type-II). Importantly, these results were replicated in a second non-overlapping independent dataset of U.S. Marines (N=96), further outlining the role of innate immune and interferon signalling genes within co-expression modules to explain at least part of the causal pathophysiology for PTSD development. A second module, consequential of trauma exposure, contained PTSD resiliency signatures and an over-expression of genes involved in hemostasis and wound responsiveness suggesting that chronic levels of stress impair proper wound healing during/after exposure to the battlefield while highlighting the role of the hemostatic system as a clinical indicator of chronic-based stress. These findings provide novel insights for early preventative measures and advanced PTSD detection, which may lead to interventions that delay or perhaps abrogate the development of PTSD.\nWe used microarrays to characterize both prognostic and diagnostic molecular signatures associated to PTSD risk and PTSD status compared to control subjects.", + "study_title": "Gene Networks Specific for Innate Immunity Define Post-traumatic Stress Disorder [Affymetrix]", + "subjects_count": 48, + "accession_number": "GSE63878", + "data_files_count": 0, + "contributor": "me.foo@smartsite.com", + }, + } + } + + # Mock get_all_metadata call to return proper document + get_all_metadata_mock = AsyncMock(return_value=existing_metadata) + patch.object(datastore, "get_all_metadata", get_all_metadata_mock).start() + + # If drop_all is called, set get_all_metadata_mock return_value to None + def wipe_return_value(mock: AsyncMock): + mock.return_value = None + + drop_all_indexes_mock = AsyncMock( + side_effect=wipe_return_value(get_all_metadata_mock) ) + patch.object(datastore, "drop_all_non_temp_indexes", drop_all_indexes_mock).start() + + respx.get( + "http://testfail/ok//mds/metadata?data=True&_guid_type=discovery_metadata&limit=1000&offset=0" + ).mock(return_value=httpx.Response(status_code=500)) + with pytest.raises(Exception): + await main( + Commons( + configuration=Config( + settings=Settings(), + schema={ + "_subjects_count": FieldDefinition(type="integer"), + "year_awarded": FieldDefinition(type="integer"), + }, + ), + gen3_commons={ + "my_commons": MDSInstance( + mds_url="http://testfail/ok/", + commons_url="test", + columns_to_fields={ + "authz": "path:authz", + "tags": "path:tags", + "_subjects_count": "path:subjects_count", + "dbgap_accession_number": "path:study_id", + "study_description": "path:study_description_summary", + "number_of_datafiles": "path:data_files_count", + "investigator": "path:contributor", + }, + ), + }, + adapter_commons={ + "adapter_commons": AdapterMDSInstance( + mds_url="", + commons_url="", + adapter="icpsr", + ), + }, + ) + ) + + # check that the get_all_metadata return value has not been changed + # since drop_all should not be called if an exception has been raised + es = await datastore.init("test", 9200) + assert (await es.get_all_metadata()) == existing_metadata + + respx.get( + "http://test/ok//mds/metadata?data=True&_guid_type=discovery_metadata&limit=1000&offset=0" + ).mock( + return_value=httpx.Response( + status_code=200, + json=existing_metadata, + ) + ) + + # Unable to update temp index, raise exception + patch.object( + datastore, + "update_metadata", + AsyncMock(side_effect=Exception("Unable")), + ).start() + with pytest.raises(Exception): + await main( + Commons( + configuration=Config( + settings=Settings(), + schema={ + "_subjects_count": FieldDefinition(type="integer"), + "year_awarded": FieldDefinition(type="integer"), + }, + ), + gen3_commons={ + "my_commons": MDSInstance( + mds_url="http://test/ok/", + commons_url="test", + columns_to_fields={ + "authz": "path:authz", + "tags": "path:tags", + "_subjects_count": "path:subjects_count", + "dbgap_accession_number": "path:study_id", + "study_description": "path:study_description_summary", + "number_of_datafiles": "path:data_files_count", + "investigator": "path:contributor", + }, + ), + }, + adapter_commons={ + "adapter_commons": AdapterMDSInstance( + mds_url="", + commons_url="", + adapter="icpsr", + ), + }, + ) + ) + + assert (await es.get_all_metadata()) == existing_metadata @pytest.mark.asyncio @@ -159,6 +588,12 @@ def test_parse_config_from_file(): with NamedTemporaryFile(mode="w+", delete=False) as fp: json.dump( { + "configuration": { + "schema": { + "_subjects_count": {"type": "integer"}, + "study_description": {}, + }, + }, "gen3_commons": { "mycommons": { "mds_url": "http://mds", @@ -187,7 +622,14 @@ def test_parse_config_from_file(): assert ( config.to_json() == Commons( - { + configuration=Config( + settings=Settings(), + schema={ + "_subjects_count": FieldDefinition(type="integer"), + "study_description": FieldDefinition(type="string"), + }, + ), + gen3_commons={ "mycommons": MDSInstance( "http://mds", "http://commons", @@ -201,7 +643,7 @@ def test_parse_config_from_file(): }, ) }, - { + adapter_commons={ "non-gen3": AdapterMDSInstance( "http://non-gen3", "non-gen3", @@ -210,3 +652,10 @@ def test_parse_config_from_file(): }, ).to_json() ) + + assert parse_config_from_file(Path("dummmy_files")) is None + + try: + parse_config_from_file(Path("/")) + except Exception as exc: + assert isinstance(exc, IOError) is True