From 0a23de9b7e656a68f20034a28aa7c44869353efb Mon Sep 17 00:00:00 2001 From: DonHaul Date: Fri, 31 Jan 2025 17:09:32 +0100 Subject: [PATCH] workflows[data]: accept to_date in harvest interval * ref: cern-sis/issues-inspire/issues/739 --- workflows/dags/data/data_harvest.py | 23 +++- ...t.test_collect_ids_param_with_to_date.yaml | 120 ++++++++++++++++++ workflows/tests/test_data_tasks.py | 37 ++++-- 3 files changed, 166 insertions(+), 14 deletions(-) create mode 100644 workflows/tests/cassettes/TestDataHarvest.test_collect_ids_param_with_to_date.yaml diff --git a/workflows/dags/data/data_harvest.py b/workflows/dags/data/data_harvest.py index 0c81906192..dd784509d5 100644 --- a/workflows/dags/data/data_harvest.py +++ b/workflows/dags/data/data_harvest.py @@ -21,8 +21,10 @@ schedule="@daily", catchup=False, tags=["data"], - max_active_runs=5, - params={"last_updated": Param(type=["null", "string"], default="")}, + params={ + "last_updated_from": Param(type=["null", "string"], default=""), + "last_updated_to": Param(type=["null", "string"], default=""), + }, ) def data_harvest_dag(): """Defines the DAG for the HEPData harvest workflow. @@ -48,14 +50,27 @@ def collect_ids(**context): Returns: list of ids """ from_date = ( - context["params"]["last_updated"] - if context["params"]["last_updated"] + context["params"]["last_updated_from"] + if context["params"]["last_updated_from"] else ds_add(context["ds"], -1) ) + to_date = context["params"]["last_updated_to"] + payload = {"inspire_ids": True, "last_updated": from_date, "sort_by": "latest"} hepdata_response = generic_http_hook.call_api( endpoint="/search/ids", method="GET", params=payload ) + # import pdb; pdb.set_trace() + if to_date: + payload = { + "inspire_ids": True, + "last_updated": to_date, + "sort_by": "latest", + } + hepdata_to_response = generic_http_hook.call_api( + endpoint="/search/ids", method="GET", params=payload + ) + return list(set(hepdata_response.json()) - set(hepdata_to_response.json())) return hepdata_response.json() diff --git a/workflows/tests/cassettes/TestDataHarvest.test_collect_ids_param_with_to_date.yaml b/workflows/tests/cassettes/TestDataHarvest.test_collect_ids_param_with_to_date.yaml new file mode 100644 index 0000000000..8eb2fd2d21 --- /dev/null +++ b/workflows/tests/cassettes/TestDataHarvest.test_collect_ids_param_with_to_date.yaml @@ -0,0 +1,120 @@ +interactions: +- request: + body: null + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + method: GET + uri: https://www.hepdata.net/search/ids?inspire_ids=True&last_updated=2025-01-25&sort_by=latest + response: + body: + string: '[2846106,2836178,2872775,2872501] + + ' + headers: + access-control-allow-origin: + - '*' + alt-svc: + - h3=":443";ma=60; + content-length: + - '34' + content-type: + - application/json + date: + - Mon, 03 Feb 2025 13:22:37 GMT + permissions-policy: + - interest-cohort=() + referrer-policy: + - strict-origin-when-cross-origin + retry-after: + - '60' + server: + - gunicorn + set-cookie: + - session=ae88aea732751134_67a0c31d.p3L-3XEtP37iCxKBijecugLw_-o; Domain=.www.hepdata.net; + Expires=Tue, 04 Feb 2025 01:22:37 GMT; HttpOnly; Path=/; SameSite=Lax + transfer-encoding: + - chunked + vary: + - Accept-Encoding + x-content-type-options: + - nosniff + x-frame-options: + - sameorigin + x-proxy-backend: + - hepdata-prod_hepdata-web_http + x-ratelimit-limit: + - '60' + x-ratelimit-remaining: + - '59' + x-ratelimit-reset: + - '1738589018' + x-xss-protection: + - 1; mode=block + status: + code: 200 + message: OK +- request: + body: null + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + method: GET + uri: https://www.hepdata.net/search/ids?inspire_ids=True&last_updated=2025-01-30&sort_by=latest + response: + body: + string: '[2846106,2836178] + + ' + headers: + access-control-allow-origin: + - '*' + alt-svc: + - h3=":443";ma=60; + content-length: + - '18' + content-type: + - application/json + date: + - Mon, 03 Feb 2025 13:22:37 GMT + permissions-policy: + - interest-cohort=() + referrer-policy: + - strict-origin-when-cross-origin + retry-after: + - '60' + server: + - gunicorn + set-cookie: + - session=391a8555c6446562_67a0c31d.5rDsKmvxvMxWODzSsA1tP-uGKMI; Domain=.www.hepdata.net; + Expires=Tue, 04 Feb 2025 01:22:37 GMT; HttpOnly; Path=/; SameSite=Lax + transfer-encoding: + - chunked + vary: + - Accept-Encoding + x-content-type-options: + - nosniff + x-frame-options: + - sameorigin + x-proxy-backend: + - hepdata-prod_hepdata-web_http + x-ratelimit-limit: + - '60' + x-ratelimit-remaining: + - '58' + x-ratelimit-reset: + - '1738589018' + x-xss-protection: + - 1; mode=block + status: + code: 200 + message: OK +version: 1 diff --git a/workflows/tests/test_data_tasks.py b/workflows/tests/test_data_tasks.py index a030480810..fc00c8e24a 100644 --- a/workflows/tests/test_data_tasks.py +++ b/workflows/tests/test_data_tasks.py @@ -11,20 +11,37 @@ @freeze_time("2024-12-11") class TestDataHarvest: dag = dagbag.get_dag("data_harvest_dag") - context = Context() @pytest.mark.vcr def test_collect_ids_param(self): task = self.dag.get_task("collect_ids") - res = task.execute(context=Context({"params": {"last_updated": "2024-12-15"}})) + res = task.execute( + context={ + "params": {"last_updated_from": "2024-12-15", "last_updated_to": ""} + } + ) assert res == [2693068, 2807749, 2809112] + @pytest.mark.vcr + def test_collect_ids_param_with_to_date(self): + task = self.dag.get_task("collect_ids") + + task.op_kwargs = { + "params": { + "last_updated_from": "2025-01-25", + "last_updated_to": "2025-01-30", + } + } + + res = task.execute_callable() + + assert res == [2872501, 2872775] + @pytest.mark.vcr def test_collect_ids_logical_date(self): task = self.dag.get_task("collect_ids") - res = task.execute( - context=Context({"ds": "2024-12-16", "params": {"last_updated": ""}}) - ) + task.op_kwargs = {"params": {"last_updated_from": "", "last_updated_to": ""}} + res = task.execute(context=Context({"ds": "2024-12-16"})) assert res == [2693068, 2807749, 2809112] @pytest.mark.vcr @@ -32,7 +49,7 @@ def test_download_record_versions(self): id = "1906174" task = self.dag.get_task("process_record.download_record_versions") task.op_args = (id,) - res = task.execute(context=self.context) + res = task.execute(context={}) assert res["base"]["record"]["inspire_id"] == id assert res["base"]["record"]["version"] == 3 assert all(value in res for value in [1, 2]) @@ -52,7 +69,7 @@ def test_build_record(self): "inspire_url": "https://inspirehep.net", "payload": payload, } - res = task.execute(context=self.context) + res = task.execute(context=Context()) assert res["$schema"] == "data_schema" assert res["_collections"] == ["Data"] @@ -112,7 +129,7 @@ def test_load_record_put(self): } task = self.dag.get_task("process_record.load_record") task.op_args = (record,) - json_response = task.execute(context=self.context) + json_response = task.execute(context={}) assert json_response @pytest.mark.vcr @@ -124,7 +141,7 @@ def test_normalize_collaborations(self): } task = self.dag.get_task("process_record.normalize_collaborations") task.op_args = (record,) - json_response = task.execute(context=self.context) + json_response = task.execute(context={}) assert "record" in json_response["collaborations"][0] assert ( @@ -146,5 +163,5 @@ def test_load_record_post(self): } task = self.dag.get_task("process_record.load_record") task.op_args = (record,) - json_response = task.execute(context=self.context) + json_response = task.execute(context=Context({})) assert json_response