Skip to content

Commit

Permalink
Add new resource for creating Whistle Mapping, Reconciliation and Bac…
Browse files Browse the repository at this point in the history
…kfill Pipeline Jobs for Healthcare Data Engine (GoogleCloudPlatform#11677)
  • Loading branch information
ashwinsshetty authored and niharika-98 committed Oct 7, 2024
1 parent 39be789 commit 7df5063
Show file tree
Hide file tree
Showing 5 changed files with 453 additions and 0 deletions.
261 changes: 261 additions & 0 deletions mmv1/products/healthcare/PipelineJob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
# Copyright 2024 Google Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

--- !ruby/object:Api::Resource
name: 'PipelineJob'
kind: 'healthcare#pipelineJob'
description: |
PipelineJobs are Long Running Operations on Healthcare API to Map or Reconcile
incoming data into FHIR format
references: !ruby/object:Api::Resource::ReferenceLinks
guides:
'Creating a PipelineJob': 'https://cloud.google.com/healthcare-api/private/healthcare-data-engine/docs/reference/rest/v1/projects.locations.datasets.pipelineJobs#PipelineJob'
api: 'https://cloud.google.com/healthcare-api/healthcare-data-engine/docs/reference/rest/v1/projects.locations.datasets.pipelineJobs'
base_url: '{{dataset}}/pipelineJobs?pipelineJobId={{name}}'
self_link: '{{dataset}}/pipelineJobs/{{name}}'
delete_url: '{{dataset}}/pipelineJobs/{{name}}'
skip_sweeper: true
update_verb: :PATCH
update_mask: true
id_format: '{{dataset}}/pipelineJobs/{{name}}'
import_format: ['{{%dataset}}/pipelineJobs/{{name}}', '{{name}}', '{{dataset}}/pipelineJobs?pipelineJobId={{name}}']
examples:
- !ruby/object:Provider::Terraform::Examples
name: 'healthcare_pipeline_job_reconciliation'
primary_resource_id: 'example-pipeline'
vars:
pipeline_name: 'example_pipeline_job'
dataset_name: 'example_dataset'
fhir_store_name: 'fhir_store'
bucket_name: 'example_bucket_name'
- !ruby/object:Provider::Terraform::Examples
name: 'healthcare_pipeline_job_backfill'
primary_resource_id: 'example-pipeline'
vars:
backfill_pipeline_name: 'example_backfill_pipeline'
dataset_name: 'example_dataset'
mapping_pipeline_name: 'example_mapping_pipeline'
- !ruby/object:Provider::Terraform::Examples
name: 'healthcare_pipeline_job_whistle_mapping'
primary_resource_id: 'example-mapping-pipeline'
vars:
pipeline_name: 'example_mapping_pipeline_job'
dataset_name: 'example_dataset'
source_fhirstore_name: 'source_fhir_store'
dest_fhirstore_name: 'dest_fhir_store'
bucket_name: 'example_bucket_name'
- !ruby/object:Provider::Terraform::Examples
name: 'healthcare_pipeline_job_mapping_recon_dest'
primary_resource_id: 'example-mapping-pipeline'
vars:
pipeline_name: 'example_mapping_pipeline_job'
recon_pipeline_name: 'example_recon_pipeline_job'
dataset_name: 'example_dataset'
source_fhirstore_name: 'source_fhir_store'
dest_fhirstore_name: 'dest_fhir_store'
bucket_name: 'example_bucket_name'
custom_code: !ruby/object:Provider::Terraform::CustomCode
decoder: templates/terraform/decoders/long_name_to_self_link.go.erb
parameters:
- !ruby/object:Api::Type::String
name: 'location'
required: true
immutable: true
url_param_only: true
description: |
Location where the Pipeline Job is to run
- !ruby/object:Api::Type::String
name: 'dataset'
required: true
immutable: true
url_param_only: true
description: |
Healthcare Dataset under which the Pipeline Job is to run
properties:
- !ruby/object:Api::Type::String
name: 'name'
description: |
Specifies the name of the pipeline job. This field is user-assigned.
required: true
- !ruby/object:Api::Type::Boolean
name: 'disableLineage'
description: |
If true, disables writing lineage for the pipeline.
required: false
default_value: false
- !ruby/object:Api::Type::KeyValueLabels
name: 'labels'
required: false
description: |
User-supplied key-value pairs used to organize Pipeline Jobs.
Label keys must be between 1 and 63 characters long, have a UTF-8 encoding of
maximum 128 bytes, and must conform to the following PCRE regular expression:
[\p{Ll}\p{Lo}][\p{Ll}\p{Lo}\p{N}_-]{0,62}
Label values are optional, must be between 1 and 63 characters long, have a
UTF-8 encoding of maximum 128 bytes, and must conform to the following PCRE
regular expression: [\p{Ll}\p{Lo}\p{N}_-]{0,63}
No more than 64 labels can be associated with a given pipeline.
An object containing a list of "key": value pairs.
Example: { "name": "wrench", "mass": "1.3kg", "count": "3" }.
- !ruby/object:Api::Type::String
name: 'selfLink'
description: |
The fully qualified name of this dataset
output: true
ignore_read: true
- !ruby/object:Api::Type::NestedObject
name: mappingPipelineJob
conflicts:
- reconciliationPipelineJob
- backfillPipelineJob
description: |
Specifies mapping configuration.
required: false
properties:
- !ruby/object:Api::Type::NestedObject
name: mappingConfig
description: |
The location of the mapping configuration.
required: true
properties:
- !ruby/object:Api::Type::String
name: description
description: |
Describes the mapping configuration.
required: false
- !ruby/object:Api::Type::NestedObject
name: whistleConfigSource
description: |
Specifies the path to the mapping configuration for harmonization pipeline.
required: false
properties:
- !ruby/object:Api::Type::String
name: uri
description: |
Main configuration file which has the entrypoint or the root function.
Example: gs://{bucket-id}/{path/to/import-root/dir}/entrypoint-file-name.wstl.
required: true
- !ruby/object:Api::Type::String
name: importUriPrefix
description: |
Directory path where all the Whistle files are located.
Example: gs://{bucket-id}/{path/to/import-root/dir}
required: true
- !ruby/object:Api::Type::NestedObject
name: fhirStreamingSource
description: |
A streaming FHIR data source.
required: false
properties:
- !ruby/object:Api::Type::String
name: fhirStore
description: |
The path to the FHIR store in the format projects/{projectId}/locations/{locationId}/datasets/{datasetId}/fhirStores/{fhirStoreId}.
required: true
- !ruby/object:Api::Type::String
name: description
description: |
Describes the streaming FHIR data source.
required: false
- !ruby/object:Api::Type::String
name: fhirStoreDestination
conflicts:
- reconciliationDestination
description: |
If set, the mapping pipeline will write snapshots to this
FHIR store without assigning stable IDs. You must
grant your pipeline project's Cloud Healthcare Service
Agent serviceaccount healthcare.fhirResources.executeBundle
and healthcare.fhirResources.create permissions on the
destination store. The destination store must set
[disableReferentialIntegrity][FhirStore.disable_referential_integrity]
to true. The destination store must use FHIR version R4.
Format: project/{projectID}/locations/{locationID}/datasets/{datasetName}/fhirStores/{fhirStoreID}.
required: false
- !ruby/object:Api::Type::Boolean
name: reconciliationDestination
conflicts:
- fhirStoreDestination
description: |
If set to true, a mapping pipeline will send output snapshots
to the reconciliation pipeline in its dataset. A reconciliation
pipeline must exist in this dataset before a mapping pipeline
with a reconciliation destination can be created.
required: false
- !ruby/object:Api::Type::NestedObject
name: reconciliationPipelineJob
conflicts:
- mappingPipelineJob
- backfillPipelineJob
description: |
Specifies reconciliation configuration.
required: false
properties:
- !ruby/object:Api::Type::NestedObject
name: mergeConfig
description: |
Specifies the location of the reconciliation configuration.
required: true
properties:
- !ruby/object:Api::Type::String
name: description
description: |
Describes the mapping configuration.
required: false
- !ruby/object:Api::Type::NestedObject
name: whistleConfigSource
description: |
Specifies the path to the mapping configuration for harmonization pipeline.
required: true
properties:
- !ruby/object:Api::Type::String
name: uri
description: |
Main configuration file which has the entrypoint or the root function.
Example: gs://{bucket-id}/{path/to/import-root/dir}/entrypoint-file-name.wstl.
required: true
- !ruby/object:Api::Type::String
name: importUriPrefix
description: |
Directory path where all the Whistle files are located.
Example: gs://{bucket-id}/{path/to/import-root/dir}
required: true
- !ruby/object:Api::Type::String
name: matchingUriPrefix
description: |
Specifies the top level directory of the matching configs used
in all mapping pipelines, which extract properties for resources
to be matched on.
Example: gs://{bucket-id}/{path/to/matching/configs}
required: true
- !ruby/object:Api::Type::String
name: fhirStoreDestination
description: |
The harmonized FHIR store to write harmonized FHIR resources to,
in the format of: project/{projectID}/locations/{locationID}/datasets/{datasetName}/fhirStores/{id}
required: false
- !ruby/object:Api::Type::NestedObject
name: backfillPipelineJob
conflicts:
- mappingPipelineJob
- reconciliationPipelineJob
description: |
Specifies the backfill configuration.
required: false
properties:
- !ruby/object:Api::Type::String
name: mappingPipelineJob
description: |
Specifies the mapping pipeline job to backfill, the name format
should follow: projects/{projectId}/locations/{locationId}/datasets/{datasetId}/pipelineJobs/{pipelineJobId}.
required: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
resource "google_healthcare_pipeline_job" "<%= ctx[:primary_resource_id] %>" {
name = "<%= ctx[:vars]['backfill_pipeline_name'] %>"
location = "us-central1"
dataset = google_healthcare_dataset.dataset.id
backfill_pipeline_job {
mapping_pipeline_job = "${google_healthcare_dataset.dataset.id}/pipelinejobs/<%= ctx[:vars]['mapping_pipeline_name'] %>"
}
}

resource "google_healthcare_dataset" "dataset" {
name = "<%= ctx[:vars]['dataset_name'] %>"
location = "us-central1"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
resource "google_healthcare_pipeline_job" "recon" {
name = "<%= ctx[:vars]['recon_pipeline_name'] %>"
location = "us-central1"
dataset = google_healthcare_dataset.dataset.id
disable_lineage = true
reconciliation_pipeline_job {
merge_config {
description = "sample description for reconciliation rules"
whistle_config_source {
uri = "gs://${google_storage_bucket.bucket.name}/${google_storage_bucket_object.merge_file.name}"
import_uri_prefix = "gs://${google_storage_bucket.bucket.name}"
}
}
matching_uri_prefix = "gs://${google_storage_bucket.bucket.name}"
fhir_store_destination = "${google_healthcare_dataset.dataset.id}/fhirStores/${google_healthcare_fhir_store.dest_fhirstore.name}"
}
}

resource "google_healthcare_pipeline_job" "<%= ctx[:primary_resource_id] %>" {
depends_on = [google_healthcare_pipeline_job.recon]
name = "<%= ctx[:vars]['pipeline_name'] %>"
location = "us-central1"
dataset = google_healthcare_dataset.dataset.id
disable_lineage = true
labels = {
example_label_key = "example_label_value"
}
mapping_pipeline_job {
mapping_config {
whistle_config_source {
uri = "gs://${google_storage_bucket.bucket.name}/${google_storage_bucket_object.mapping_file.name}"
import_uri_prefix = "gs://${google_storage_bucket.bucket.name}"
}
description = "example description for mapping configuration"
}
fhir_streaming_source {
fhir_store = "${google_healthcare_dataset.dataset.id}/fhirStores/${google_healthcare_fhir_store.source_fhirstore.name}"
description = "example description for streaming fhirstore"
}
reconciliation_destination = true
}
}

resource "google_healthcare_dataset" "dataset" {
name = "<%= ctx[:vars]['dataset_name'] %>"
location = "us-central1"
}

resource "google_healthcare_fhir_store" "source_fhirstore" {
name = "<%= ctx[:vars]['source_fhirstore_name'] %>"
dataset = google_healthcare_dataset.dataset.id
version = "R4"
enable_update_create = true
disable_referential_integrity = true
}

resource "google_healthcare_fhir_store" "dest_fhirstore" {
name = "<%= ctx[:vars]['dest_fhirstore_name'] %>"
dataset = google_healthcare_dataset.dataset.id
version = "R4"
enable_update_create = true
disable_referential_integrity = true
}

resource "google_storage_bucket" "bucket" {
name = "<%= ctx[:vars]['bucket_name'] %>"
location = "us-central1"
uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "mapping_file" {
name = "mapping.wstl"
content = " "
bucket = google_storage_bucket.bucket.name
}

resource "google_storage_bucket_object" "merge_file" {
name = "merge.wstl"
content = " "
bucket = google_storage_bucket.bucket.name
}
Loading

0 comments on commit 7df5063

Please sign in to comment.