Skip to content

Commit

Permalink
workflows[data]: accept to_date in harvest interval
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Feb 3, 2025
1 parent 91f5a91 commit 0a23de9
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 14 deletions.
23 changes: 19 additions & 4 deletions workflows/dags/data/data_harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
schedule="@daily",
catchup=False,
tags=["data"],
max_active_runs=5,
params={"last_updated": Param(type=["null", "string"], default="")},
params={
"last_updated_from": Param(type=["null", "string"], default=""),
"last_updated_to": Param(type=["null", "string"], default=""),
},
)
def data_harvest_dag():
"""Defines the DAG for the HEPData harvest workflow.
Expand All @@ -48,14 +50,27 @@ def collect_ids(**context):
Returns: list of ids
"""
from_date = (
context["params"]["last_updated"]
if context["params"]["last_updated"]
context["params"]["last_updated_from"]
if context["params"]["last_updated_from"]
else ds_add(context["ds"], -1)
)
to_date = context["params"]["last_updated_to"]

payload = {"inspire_ids": True, "last_updated": from_date, "sort_by": "latest"}
hepdata_response = generic_http_hook.call_api(
endpoint="/search/ids", method="GET", params=payload
)
# import pdb; pdb.set_trace()
if to_date:
payload = {
"inspire_ids": True,
"last_updated": to_date,
"sort_by": "latest",
}
hepdata_to_response = generic_http_hook.call_api(
endpoint="/search/ids", method="GET", params=payload
)
return list(set(hepdata_response.json()) - set(hepdata_to_response.json()))

return hepdata_response.json()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
interactions:
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
method: GET
uri: https://www.hepdata.net/search/ids?inspire_ids=True&last_updated=2025-01-25&sort_by=latest
response:
body:
string: '[2846106,2836178,2872775,2872501]
'
headers:
access-control-allow-origin:
- '*'
alt-svc:
- h3=":443";ma=60;
content-length:
- '34'
content-type:
- application/json
date:
- Mon, 03 Feb 2025 13:22:37 GMT
permissions-policy:
- interest-cohort=()
referrer-policy:
- strict-origin-when-cross-origin
retry-after:
- '60'
server:
- gunicorn
set-cookie:
- session=ae88aea732751134_67a0c31d.p3L-3XEtP37iCxKBijecugLw_-o; Domain=.www.hepdata.net;
Expires=Tue, 04 Feb 2025 01:22:37 GMT; HttpOnly; Path=/; SameSite=Lax
transfer-encoding:
- chunked
vary:
- Accept-Encoding
x-content-type-options:
- nosniff
x-frame-options:
- sameorigin
x-proxy-backend:
- hepdata-prod_hepdata-web_http
x-ratelimit-limit:
- '60'
x-ratelimit-remaining:
- '59'
x-ratelimit-reset:
- '1738589018'
x-xss-protection:
- 1; mode=block
status:
code: 200
message: OK
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
method: GET
uri: https://www.hepdata.net/search/ids?inspire_ids=True&last_updated=2025-01-30&sort_by=latest
response:
body:
string: '[2846106,2836178]
'
headers:
access-control-allow-origin:
- '*'
alt-svc:
- h3=":443";ma=60;
content-length:
- '18'
content-type:
- application/json
date:
- Mon, 03 Feb 2025 13:22:37 GMT
permissions-policy:
- interest-cohort=()
referrer-policy:
- strict-origin-when-cross-origin
retry-after:
- '60'
server:
- gunicorn
set-cookie:
- session=391a8555c6446562_67a0c31d.5rDsKmvxvMxWODzSsA1tP-uGKMI; Domain=.www.hepdata.net;
Expires=Tue, 04 Feb 2025 01:22:37 GMT; HttpOnly; Path=/; SameSite=Lax
transfer-encoding:
- chunked
vary:
- Accept-Encoding
x-content-type-options:
- nosniff
x-frame-options:
- sameorigin
x-proxy-backend:
- hepdata-prod_hepdata-web_http
x-ratelimit-limit:
- '60'
x-ratelimit-remaining:
- '58'
x-ratelimit-reset:
- '1738589018'
x-xss-protection:
- 1; mode=block
status:
code: 200
message: OK
version: 1
37 changes: 27 additions & 10 deletions workflows/tests/test_data_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,45 @@
@freeze_time("2024-12-11")
class TestDataHarvest:
dag = dagbag.get_dag("data_harvest_dag")
context = Context()

@pytest.mark.vcr
def test_collect_ids_param(self):
task = self.dag.get_task("collect_ids")
res = task.execute(context=Context({"params": {"last_updated": "2024-12-15"}}))
res = task.execute(
context={
"params": {"last_updated_from": "2024-12-15", "last_updated_to": ""}
}
)
assert res == [2693068, 2807749, 2809112]

@pytest.mark.vcr
def test_collect_ids_param_with_to_date(self):
task = self.dag.get_task("collect_ids")

task.op_kwargs = {
"params": {
"last_updated_from": "2025-01-25",
"last_updated_to": "2025-01-30",
}
}

res = task.execute_callable()

assert res == [2872501, 2872775]

@pytest.mark.vcr
def test_collect_ids_logical_date(self):
task = self.dag.get_task("collect_ids")
res = task.execute(
context=Context({"ds": "2024-12-16", "params": {"last_updated": ""}})
)
task.op_kwargs = {"params": {"last_updated_from": "", "last_updated_to": ""}}
res = task.execute(context=Context({"ds": "2024-12-16"}))
assert res == [2693068, 2807749, 2809112]

@pytest.mark.vcr
def test_download_record_versions(self):
id = "1906174"
task = self.dag.get_task("process_record.download_record_versions")
task.op_args = (id,)
res = task.execute(context=self.context)
res = task.execute(context={})
assert res["base"]["record"]["inspire_id"] == id
assert res["base"]["record"]["version"] == 3
assert all(value in res for value in [1, 2])
Expand All @@ -52,7 +69,7 @@ def test_build_record(self):
"inspire_url": "https://inspirehep.net",
"payload": payload,
}
res = task.execute(context=self.context)
res = task.execute(context=Context())
assert res["$schema"] == "data_schema"
assert res["_collections"] == ["Data"]

Expand Down Expand Up @@ -112,7 +129,7 @@ def test_load_record_put(self):
}
task = self.dag.get_task("process_record.load_record")
task.op_args = (record,)
json_response = task.execute(context=self.context)
json_response = task.execute(context={})
assert json_response

@pytest.mark.vcr
Expand All @@ -124,7 +141,7 @@ def test_normalize_collaborations(self):
}
task = self.dag.get_task("process_record.normalize_collaborations")
task.op_args = (record,)
json_response = task.execute(context=self.context)
json_response = task.execute(context={})

assert "record" in json_response["collaborations"][0]
assert (
Expand All @@ -146,5 +163,5 @@ def test_load_record_post(self):
}
task = self.dag.get_task("process_record.load_record")
task.op_args = (record,)
json_response = task.execute(context=self.context)
json_response = task.execute(context=Context({}))
assert json_response

0 comments on commit 0a23de9

Please sign in to comment.