Skip to content

Commit

Permalink
Feature/mx 1586 wrap up seq repo (#106)
Browse files Browse the repository at this point in the history
There were issues pointed in wrap-up, this PR fixes these issues: 
- duplicate activities 
- distributions not matching in numbers with resources


# Changes
- combine seq-repo distribution and resource extraction in one asset
- duplicate seq-repo activities are filtered out

---------

Signed-off-by: erichesse <[email protected]>
Signed-off-by: erichesse <[email protected]>
Co-authored-by: Eric Heße <[email protected]>
  • Loading branch information
mr-kamran-ali and erichesse authored Jun 26, 2024
1 parent 3669319 commit 4e2a348
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 185 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changes

- combine seq-repo distribution and resource extraction in one asset
- duplicate seq-repo activities are filtered out

### Deprecated

### Removed
Expand All @@ -36,7 +39,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- wikidata.extract module
- extract voxco data


### Changes

- update mex-common to 0.27.1
Expand Down
81 changes: 26 additions & 55 deletions mex/seq_repo/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from itertools import tee

from mex.common.cli import entrypoint
from mex.common.ldap.extract import get_merged_ids_by_query_string
from mex.common.ldap.models.person import LDAPPersonWithQuery
Expand Down Expand Up @@ -32,8 +30,7 @@
from mex.seq_repo.transform import (
transform_seq_repo_access_platform_to_extracted_access_platform,
transform_seq_repo_activities_to_extracted_activities,
transform_seq_repo_distribution_to_extracted_distribution,
transform_seq_repo_resource_to_extracted_resource,
transform_seq_repo_resource_to_extracted_resource_and_distribution,
)
from mex.sinks import load

Expand Down Expand Up @@ -120,12 +117,8 @@ def extracted_activity(
project_coordinators_merged_ids_by_query_string,
extracted_primary_source_seq_repo,
)
mex_activities_gens = tee(mex_activities, 2)
load(mex_activities_gens[0])
return {
activity.identifierInPrimarySource: activity
for activity in mex_activities_gens[1]
}
load(mex_activities)
return {activity.identifierInPrimarySource: activity for activity in mex_activities}


@asset(group_name="seq_repo")
Expand All @@ -152,39 +145,10 @@ def seq_repo_extracted_access_platform(


@asset(group_name="seq_repo")
def extracted_distribution(
def seq_repo_resource_and_distribution(
seq_repo_latest_source: dict[str, SeqRepoSource],
extracted_primary_source_seq_repo: ExtractedPrimarySource,
extracted_organization_rki: ExtractedOrganization,
seq_repo_extracted_access_platform: ExtractedAccessPlatform,
) -> dict[str, ExtractedDistribution]:
"""Extract distribution from Seq-Repo."""
settings = SeqRepoSettings.get()
distribution = extract_mapping_data(
settings.mapping_path / "distribution.yaml",
ExtractedDistribution,
)
mex_distributions = transform_seq_repo_distribution_to_extracted_distribution(
seq_repo_latest_source,
distribution,
seq_repo_extracted_access_platform,
extracted_organization_rki,
extracted_primary_source_seq_repo,
)

mex_distributions_gens = tee(mex_distributions, 2)
load(mex_distributions_gens[0])
return {
distribution.identifierInPrimarySource: distribution
for distribution in mex_distributions_gens[1]
}


@asset(group_name="seq_repo")
def seq_repo_resource(
seq_repo_latest_source: dict[str, SeqRepoSource],
extracted_distribution: dict[str, ExtractedDistribution],
extracted_activity: dict[str, ExtractedActivity],
seq_repo_extracted_access_platform: ExtractedAccessPlatform,
seq_repo_source_resolved_project_coordinators: list[LDAPPersonWithQuery],
unit_stable_target_ids_by_synonym: dict[str, MergedOrganizationalUnitIdentifier],
project_coordinators_merged_ids_by_query_string: dict[
Expand All @@ -193,29 +157,36 @@ def seq_repo_resource(
extracted_organization_rki: ExtractedOrganization,
extracted_primary_source_seq_repo: ExtractedPrimarySource,
) -> list[ExtractedResource]:
"""Extract resource from Seq-Repo."""
"""Extract resource and distribution from Seq-Repo."""
settings = SeqRepoSettings.get()
resource = extract_mapping_data(
settings.mapping_path / "resource.yaml",
ExtractedResource,
)
distribution = extract_mapping_data(
settings.mapping_path / "distribution.yaml",
ExtractedDistribution,
)

mex_resources = transform_seq_repo_resource_to_extracted_resource(
seq_repo_latest_source,
extracted_distribution,
extracted_activity,
resource,
seq_repo_source_resolved_project_coordinators,
unit_stable_target_ids_by_synonym,
project_coordinators_merged_ids_by_query_string,
extracted_organization_rki,
extracted_primary_source_seq_repo,
mex_resources, mex_distributions = (
transform_seq_repo_resource_to_extracted_resource_and_distribution(
seq_repo_latest_source,
extracted_activity,
seq_repo_extracted_access_platform,
resource,
distribution,
seq_repo_source_resolved_project_coordinators,
unit_stable_target_ids_by_synonym,
project_coordinators_merged_ids_by_query_string,
extracted_organization_rki,
extracted_primary_source_seq_repo,
)
)

mex_sources_list = list(mex_resources)
load(mex_sources_list)
load(mex_resources)
load(mex_distributions)

return list(mex_sources_list)
return mex_resources


@entrypoint(SeqRepoSettings)
Expand Down
98 changes: 48 additions & 50 deletions mex/seq_repo/transform.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections.abc import Generator
from typing import Any

from mex.common.ldap.models.person import LDAPPersonWithQuery
Expand Down Expand Up @@ -26,8 +25,8 @@ def transform_seq_repo_activities_to_extracted_activities(
str, list[MergedPersonIdentifier]
],
extracted_primary_source: ExtractedPrimarySource,
) -> Generator[ExtractedActivity, None, None]:
"""Transform seq-repo activity to ExtractedActivity.
) -> list[ExtractedActivity]:
"""Transform seq-repo activities to list of unique ExtractedActivity.
Args:
seq_repo_sources: Seq Repo extracted sources
Expand All @@ -40,9 +39,10 @@ def transform_seq_repo_activities_to_extracted_activities(
extracted_primary_source: Extracted primary source
Returns:
Generator for ExtractedActivity
list of unique ExtractedActivity
"""
theme = seq_repo_activity["theme"][0]["mappingRules"][0]["setValues"]
unique_activities = []

for source in seq_repo_sources.values():
project_coordinators_ids, responsible_units = (
Expand All @@ -56,7 +56,8 @@ def transform_seq_repo_activities_to_extracted_activities(

if not responsible_units or not project_coordinators_ids:
continue
yield ExtractedActivity(

extracted_activity = ExtractedActivity(
contact=project_coordinators_ids,
hadPrimarySource=extracted_primary_source.stableTargetId,
identifierInPrimarySource=source.project_id,
Expand All @@ -66,65 +67,37 @@ def transform_seq_repo_activities_to_extracted_activities(
title=source.project_name,
)

if extracted_activity not in unique_activities:
unique_activities.append(extracted_activity)

def transform_seq_repo_distribution_to_extracted_distribution(
seq_repo_sources: dict[str, SeqRepoSource],
seq_repo_distribution: dict[str, Any],
mex_access_platform: ExtractedAccessPlatform,
extracted_organization_rki: ExtractedOrganization,
extracted_primary_source: ExtractedPrimarySource,
) -> Generator[ExtractedDistribution, None, None]:
"""Transform seq-repo distribution to ExtractedDistribution.
Args:
seq_repo_sources: Seq Repo extracted sources
seq_repo_distribution: Seq Repo extracted distribution
mex_access_platform: Extracted access platform
extracted_organization_rki: wikidata extracted organization
extracted_primary_source: Extracted primary source
Returns:
Generator for ExtractedDistribution
"""
access_restriction = seq_repo_distribution["accessRestriction"][0]["mappingRules"][
0
]["setValues"]
media_type = seq_repo_distribution["mediaType"][0]["mappingRules"][0]["setValues"]
title = seq_repo_distribution["title"][0]["mappingRules"][0]["setValues"]

for identifier_in_primary_source, source in seq_repo_sources.items():
yield ExtractedDistribution(
accessService=mex_access_platform.stableTargetId,
accessRestriction=access_restriction,
hadPrimarySource=extracted_primary_source.stableTargetId,
identifierInPrimarySource=identifier_in_primary_source,
issued=source.sequencing_date,
mediaType=media_type,
publisher=extracted_organization_rki.stableTargetId,
title=title,
)
return unique_activities


def transform_seq_repo_resource_to_extracted_resource(
def transform_seq_repo_resource_to_extracted_resource_and_distribution(
seq_repo_sources: dict[str, SeqRepoSource],
seq_repo_distributions: dict[str, ExtractedDistribution],
seq_repo_activities: dict[str, ExtractedActivity],
mex_access_platform: ExtractedAccessPlatform,
seq_repo_resource: dict[str, Any],
seq_repo_distribution: dict[str, Any],
seq_repo_source_resolved_project_coordinators: list[LDAPPersonWithQuery],
unit_stable_target_ids_by_synonym: dict[str, MergedOrganizationalUnitIdentifier],
project_coordinators_merged_ids_by_query_string: dict[
str, list[MergedPersonIdentifier]
],
extracted_organization_rki: ExtractedOrganization,
extracted_primary_source: ExtractedPrimarySource,
) -> Generator[ExtractedResource, None, None]:
"""Transform seq-repo resource to ExtractedResource.
) -> tuple[list[ExtractedResource], list[ExtractedDistribution]]:
"""Transform seq-repo resources and distributions.
transform seq-repo resources to ExtractedResource and
transform seq-repo distributions to ExtractedDistribution.
Args:
seq_repo_sources: Seq Repo extracted sources
seq_repo_distributions: Seq Repo extracted distribution
seq_repo_activities: Seq Repo extracted activity for default values from mapping
mex_access_platform: Extracted access platform
seq_repo_resource: Seq Repo extracted resource
seq_repo_distribution: Seq Repo extracted distribution
seq_repo_source_resolved_project_coordinators: Seq Repo sources resolved project
coordinators ldap query results
unit_stable_target_ids_by_synonym: Unit stable target ids by synonym
Expand All @@ -134,8 +107,9 @@ def transform_seq_repo_resource_to_extracted_resource(
extracted_primary_source: Extracted primary source
Returns:
Generator for ExtractedResource
lists of ExtractedResource and ExtractedDistribution
"""
# Resource values from mapping
access_restriction = seq_repo_resource["accessRestriction"][0]["mappingRules"][0][
"setValues"
]
Expand All @@ -159,8 +133,16 @@ def transform_seq_repo_resource_to_extracted_resource(
][0]["setValues"]
theme = seq_repo_resource["theme"][0]["mappingRules"][0]["setValues"]

# Distribution values from mapping
access_restriction = seq_repo_distribution["accessRestriction"][0]["mappingRules"][
0
]["setValues"]
media_type = seq_repo_distribution["mediaType"][0]["mappingRules"][0]["setValues"]
title = seq_repo_distribution["title"][0]["mappingRules"][0]["setValues"]

extracted_resources = []
extracted_distributions = []
for identifier_in_primary_source, source in seq_repo_sources.items():
distribution = seq_repo_distributions[identifier_in_primary_source]
activity = seq_repo_activities.get(source.project_id)

project_coordinators_ids, units_in_charge = (
Expand All @@ -179,14 +161,27 @@ def transform_seq_repo_resource_to_extracted_resource(
source.customer_org_unit_id
)

yield ExtractedResource(
extracted_distribution = ExtractedDistribution(
accessService=mex_access_platform.stableTargetId,
accessRestriction=access_restriction,
hadPrimarySource=extracted_primary_source.stableTargetId,
identifierInPrimarySource=identifier_in_primary_source,
issued=source.sequencing_date,
mediaType=media_type,
publisher=extracted_organization_rki.stableTargetId,
title=title,
)
extracted_distributions.append(extracted_distribution)

extracted_resource = ExtractedResource(
accessPlatform=mex_access_platform.stableTargetId,
accessRestriction=access_restriction,
accrualPeriodicity=accrual_periodicity,
anonymizationPseudonymization=anonymization_pseudonymization,
contact=project_coordinators_ids,
contributingUnit=contributing_unit or [],
created=source.sequencing_date,
distribution=distribution.stableTargetId,
distribution=extracted_distribution.stableTargetId,
hadPrimarySource=extracted_primary_source.stableTargetId,
identifierInPrimarySource=identifier_in_primary_source,
instrumentToolOrApparatus=source.sequencing_platform,
Expand All @@ -202,6 +197,9 @@ def transform_seq_repo_resource_to_extracted_resource(
unitInCharge=units_in_charge,
wasGeneratedBy=activity.stableTargetId if activity else None,
)
extracted_resources.append(extracted_resource)

return extracted_resources, extracted_distributions


def transform_seq_repo_access_platform_to_extracted_access_platform(
Expand Down
26 changes: 0 additions & 26 deletions tests/seq_repo/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from mex.common.models import (
ExtractedAccessPlatform,
ExtractedActivity,
ExtractedDistribution,
ExtractedOrganization,
ExtractedPerson,
ExtractedPrimarySource,
)
Expand All @@ -28,7 +26,6 @@
from mex.seq_repo.transform import (
transform_seq_repo_access_platform_to_extracted_access_platform,
transform_seq_repo_activities_to_extracted_activities,
transform_seq_repo_distribution_to_extracted_distribution,
)


Expand Down Expand Up @@ -395,29 +392,6 @@ def extracted_mex_activities_dict(
}


@pytest.fixture
def extracted_mex_distribution_dict(
extracted_primary_source_seq_repo: ExtractedPrimarySource,
seq_repo_latest_sources: dict[str, SeqRepoSource],
extracted_mex_access_platform: ExtractedAccessPlatform,
seq_repo_distribution: dict[str, Any],
extracted_organization_rki: ExtractedOrganization,
) -> dict[str, ExtractedDistribution]:
extracted_mex_distributions = (
transform_seq_repo_distribution_to_extracted_distribution(
seq_repo_latest_sources,
seq_repo_distribution,
extracted_mex_access_platform,
extracted_organization_rki,
extracted_primary_source_seq_repo,
)
)
return {
distribution.identifierInPrimarySource: distribution
for distribution in extracted_mex_distributions
}


@pytest.fixture
def seq_repo_source_resolved_project_coordinators() -> list[LDAPPersonWithQuery]:
"""Extract source project coordinators."""
Expand Down
Loading

0 comments on commit 4e2a348

Please sign in to comment.