Skip to content

Commit

Permalink
Merge pull request #69 from uc-cdis/feat/expires_in
Browse files Browse the repository at this point in the history
PXP-10558 Pelican export: store PFB file's `_expires_at` in MDS
  • Loading branch information
paulineribeyre authored Feb 14, 2023
2 parents 1d96702 + 046e5e5 commit f662956
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ repos:
- id: no-commit-to-branch
args: [--branch, develop, --branch, master, --pattern, release/.*]
- repo: https://github.com/psf/black
rev: 19.10b0
rev: 22.12.0
hooks:
- id: black
11 changes: 8 additions & 3 deletions docs/pelican-export.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ Start the Sower job by hitting the `POST <base URL>/job/dispatch` endpoint with

## Setup

1. Run `gen3 kube-setup-pelicanjob`. This will create the S3 bucket for exported PFB files.
2. Setup and install [Sower](https://github.com/uc-cdis/sower).
3. Update the manifest to include the configuration for the `pelican-export` job:
1. This job should be deployed alongside the `metadata-delete-expired-objects` cronjob:
- Add `"metadata-delete-expired-objects": "quay.io/cdis/metadata-delete-expired-objects:<version>"` to the `versions` block of the manifest.
- Run `gen3 kube-setup-metadata-delete-expired-objects-cronjob`.
- Grant the `metadata-delete-expired-objects-job` client access to `(resource=/mds_gateway, method=access, service=mds_gateway)` and `(resource=/programs, method=delete, service=fence)` in the `user.yaml`.
2. Run `gen3 kube-setup-pelicanjob`. This will create the S3 bucket for exported PFB files and the Fence client for submitting to the metadata service.
3. Grant the `pelican-export-job` client access to `(resource=/mds_gateway, method=access, service=mds_gateway)` in the `user.yaml`.
4. Set up and deploy [Sower](https://github.com/uc-cdis/sower).
5. Update the manifest to include the configuration for the `pelican-export` job:

##### Manifest configuration

Expand Down
39 changes: 36 additions & 3 deletions job_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,37 @@
from pfb.writer import PFBWriter
from pyspark import SparkConf
from pyspark.sql import SparkSession
import requests

from pelican.dictionary import init_dictionary, DataDictionaryTraversal
from pelican.graphql.guppy_gql import GuppyGQL
from pelican.jobs import export_pfb_job
from pelican.s3 import s3upload_file
from pelican.indexd import indexd_submit
from pelican.mds import metadata_submit_expiration

if __name__ == "__main__":
node = os.environ["ROOT_NODE"]
access_token = os.environ["ACCESS_TOKEN"]
input_data = os.environ["INPUT_DATA"]
access_format = os.environ["ACCESS_FORMAT"]
# the PFB file and indexd/mds records expire after 14 days by default
record_expiration_days = os.environ.get("RECORD_EXPIRATION_DAYS", 14)

print("This is the format")
print(access_format)

with open("/pelican-creds.json") as pelican_creds_file:
pelican_creds = json.load(pelican_creds_file)
for key in [
"manifest_bucket_name",
"aws_access_key_id",
"aws_secret_access_key",
"fence_client_id",
"fence_client_secret",
]:
assert pelican_creds.get(key), f"No '{key}' in config"

input_data = json.loads(input_data)

gql = GuppyGQL(
Expand Down Expand Up @@ -117,9 +132,6 @@
True, # include upward nodes: project, program etc
)

with open("/pelican-creds.json") as pelican_creds_file:
pelican_creds = json.load(pelican_creds_file)

avro_filename = "{}.avro".format(
datetime.now().strftime("export_%Y-%m-%dT%H:%M:%S")
)
Expand Down Expand Up @@ -162,6 +174,20 @@

s3_url = "s3://" + pelican_creds["manifest_bucket_name"] + "/" + avro_filename

# exchange the client ID and secret for an access token
r = requests.post(
f"{COMMONS}user/oauth2/token?grant_type=client_credentials&scope=openid",
auth=(
pelican_creds["fence_client_id"],
pelican_creds["fence_client_secret"],
),
)
if r.status_code != 200:
raise Exception(
f"Failed to obtain access token using OIDC client credentials - {r.status_code}:\n{r.text}"
)
client_access_token = r.json()["access_token"]

indexd_record = indexd_submit(
COMMONS,
indexd_creds["user_db"]["gdcapi"],
Expand All @@ -172,6 +198,13 @@
authz,
)

metadata_submit_expiration(
hostname=COMMONS,
guid=indexd_record["did"],
access_token=client_access_token,
record_expiration_days=record_expiration_days,
)

# send s3 link and information to indexd to create guid and send it back
print("[out] {}".format(indexd_record["did"]))

Expand Down
1 change: 0 additions & 1 deletion pelican/indexd.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import requests
import json
import base64


def indexd_submit(
Expand Down
21 changes: 21 additions & 0 deletions pelican/mds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from datetime import datetime, timedelta
import requests


def metadata_submit_expiration(hostname, guid, access_token, record_expiration_days):
expires_at = (datetime.now() + timedelta(days=record_expiration_days)).timestamp()
url = f"{hostname}mds/metadata/{guid}"
body = {"_expires_at": expires_at}
print("-----------------------------------------------------")
print(url)
print(body)
print("-----------------------------------------------------")
r = requests.post(
url,
json=body,
headers={"Authorization": f"bearer {access_token}"},
)
if r.status_code != 201:
raise Exception(
f"Submission to metadata-service failed with {r.status_code}:\n{r.text}"
)

0 comments on commit f662956

Please sign in to comment.