From 22728f341e600fb4113948b3abbb9e6760a44de8 Mon Sep 17 00:00:00 2001 From: Sandip Samal Date: Tue, 28 Jan 2025 16:55:42 -0500 Subject: [PATCH] retry retrieve module --- pfdcm.py | 65 +++++++++++++++++++++++++ reg_chxr.py | 135 +++++++++++++++++++++++++++++----------------------- setup.py | 2 +- 3 files changed, 141 insertions(+), 61 deletions(-) create mode 100644 pfdcm.py diff --git a/pfdcm.py b/pfdcm.py new file mode 100644 index 0000000..6069c48 --- /dev/null +++ b/pfdcm.py @@ -0,0 +1,65 @@ +import requests +from loguru import logger +import sys +import copy +from collections import ChainMap +import json + +LOG = logger.debug + +logger_format = ( + "{time:YYYY-MM-DD HH:mm:ss} │ " + "{level: <5} │ " + "{name: >28}::" + "{function: <30} @" + "{line: <4} ║ " + "{message}" +) +logger.remove() +logger.add(sys.stderr, format=logger_format) + +def health_check(url: str): + pfdcm_about_api = f'{url}about/' + headers = {'Content-Type': 'application/json', 'accept': 'application/json'} + try: + response = requests.get(pfdcm_about_api, headers=headers) + return response + except Exception as er: + raise Exception("Connection to pfdcm could not be established.") + + +def retrieve_pacsfiles(directive: dict, url: str, pacs_name: str): + """ + This method uses the async API endpoint of `pfdcm` to send a single 'retrieve' request that in + turn uses `oxidicom` to push and register PACS files to a CUBE instance + """ + + pfdcm_dicom_api = f'{url}PACS/thread/pypx/' + headers = {'Content-Type': 'application/json', 'accept': 'application/json'} + body = { + "PACSservice": { + "value": pacs_name + }, + "listenerService": { + "value": "default" + }, + "PACSdirective": { + "withFeedBack": True, + "then": "retrieve", + "thenArgs": '', + "dblogbasepath": '/home/dicom/log', + "json_response": False + } + } + body["PACSdirective"].update(directive) + LOG(f"request : {body}") + + try: + response = requests.post(pfdcm_dicom_api, json=body, headers=headers) + d_response = json.loads(response.text) + if d_response['response']['job']['status']: + return d_response + else: + raise Exception(d_response['message']) + except Exception as er: + LOG(er) \ No newline at end of file diff --git a/reg_chxr.py b/reg_chxr.py index deeb594..91eed5e 100644 --- a/reg_chxr.py +++ b/reg_chxr.py @@ -11,6 +11,8 @@ import copy import sys import os +import pfdcm +import copy LOG = logger.debug @@ -41,6 +43,18 @@ parser = ArgumentParser(description='A plugin to wait till a particular set of PACS files are registered to a CUBE instance', formatter_class=ArgumentDefaultsHelpFormatter) +parser.add_argument( + '--PACSurl', + default='', + type=str, + help='endpoint URL of pfdcm' +) +parser.add_argument( + '--PACSname', + default='MINICHRISORTHANC', + type=str, + help='name of the PACS' +) parser.add_argument( "--CUBEurl", default="http://localhost:8000/api/v1/", @@ -92,7 +106,6 @@ help='Orthanc server url', default='http://0.0.0.0:8042' ) - parser.add_argument( '--orthancUsername', dest='orthancUsername', @@ -100,7 +113,6 @@ help='Orthanc server username', default='orthanc' ) - parser.add_argument( '--orthancPassword', dest='orthancPassword', @@ -108,7 +120,6 @@ help='Orthanc server password', default='orthanc' ) - parser.add_argument( '--pushToRemote', dest='pushToRemote', @@ -182,63 +193,8 @@ def main(options: Namespace, inputdir: Path, outputdir: Path): if len(data) == 0: raise Exception(f"Cannot verify registration for empty pacs data.") - status: bool = True - # for each individual series, check if total file count matches total file registered - for series in data: - pacs_search_params = sanitize_for_cube(series) - file_count = int(series["NumberOfSeriesRelatedInstances"]) - registered_file_count = cube_cl.get_pacs_registered(pacs_search_params) - LOG(f"Polling for SeriesInstanceUID: {series["SeriesInstanceUID"]}") - - # keep a record all the series that were not registered to retry - unregistered_series = [] - - # no. of times to retry unregistered series - retries = 5 - - # poll CUBE at regular interval for the status of file registration - poll_count = 0 - total_polls = options.maxPoll - wait_poll = options.pollInterval - while registered_file_count < 1 and poll_count <= total_polls: - poll_count += 1 - time.sleep(wait_poll) - registered_file_count = cube_cl.get_pacs_registered(pacs_search_params) - LOG(f"{registered_file_count} series found in CUBE.") - unregistered_series.append(pacs_search_params) - - # check if polling timed out before registration is finished - if registered_file_count == 0: - LOG(f"PACS file registration unsuccessful. Please try again.") - status = False - continue - # raise Exception(f"PACS file registration unsuccessful. Please try again.") - LOG(f"{file_count} files successfully registered to CUBE.") - send_params = { - "url": options.orthancUrl, - "username": options.orthancUsername, - "password": options.orthancPassword, - "aec": options.pushToRemote - } - dicom_dir = cube_cl.get_pacs_files(pacs_search_params) - - # create connection object - cube_con = ChrisClient(options.CUBEurl, options.CUBEuser, options.CUBEpassword) - cube_con.anonymize(dicom_dir, options.tagStruct,send_params, options.pluginInstanceID) - - if not status: - raise Exception(f"PACS file registration unsuccessful. Please try again.") - if unregistered_series: - retry_retrieve(unregistered_series, retries) - -def verify_registration(l_series: dict): - pass - -def retry_retrieve(series: dict, retries: int): - retries -= 1 - for item in series: - # retrieve series - pass + retry_table = create_hash_table(data, 5) + check_registration(options, retry_table, cube_cl) def sanitize_for_cube(series: dict) -> dict: @@ -269,6 +225,65 @@ def health_check(options) -> bool: return False return True +def create_hash_table(retrieve_data: dict, retry: int) -> dict: + retry_table: dict = {} + for series in retrieve_data: + retry_table[series["SeriesInstanceUID"]] = retry + return retry_table + +def check_registration(options: Namespace, retry_table: dict, client: PACSClient): + # null check + if len(retry_table) == 0: + return + + clone_retry_table = copy.deepcopy(retry_table) + + for series_instance in retry_table.keys(): + LOG(f"Polling CUBE for series: {series_instance}.") + registered_series_count = client.get_pacs_registered({'SeriesInstanceUID':series_instance}) + + # poll CUBE at regular interval for the status of file registration + poll_count: int = 0 + total_polls: int = options.maxPoll + wait_poll: int = options.pollInterval + while registered_series_count < 1 and poll_count < total_polls: + poll_count += 1 + time.sleep(wait_poll) + registered_series_count = client.get_pacs_registered({'SeriesInstanceUID':series_instance}) + LOG(f"{registered_series_count} series found in CUBE.") + + # check if polling timed out before registration is finished + if registered_series_count == 0 and clone_retry_table[series_instance] > 0: + LOG(f"PACS series registration unsuccessful. Retrying retrieve for {series_instance}.") + # retry retrieve + retrieve_response = pfdcm.retrieve_pacsfiles({'SeriesInstanceUID':series_instance}, + options.PACSurl, options.PACSname) + + # save retry file + srs_json_file_path = os.path.join(options.outputdir, + f"{series_instance}_retrieve_retry_{clone_retry_table[series_instance]}.json") + clone_retry_table[series_instance] -= 1 + with open(srs_json_file_path, 'w', encoding='utf-8') as jsonf: + jsonf.write(json.dumps(retrieve_response, indent=4)) + continue + + if registered_series_count: + LOG(f"Series {series_instance} successfully registered to CUBE.") + send_params = { + "url": options.orthancUrl, + "username": options.orthancUsername, + "password": options.orthancPassword, + "aec": options.pushToRemote + } + dicom_dir = client.get_pacs_files({'SeriesInstanceUID': series_instance}) + + # create connection object + cube_con = ChrisClient(options.CUBEurl, options.CUBEuser, options.CUBEpassword) + cube_con.anonymize(dicom_dir, options.tagStruct, send_params, options.pluginInstanceID) + clone_retry_table.pop(series_instance) + + check_registration(options, clone_retry_table, client) + if __name__ == '__main__': main() diff --git a/setup.py b/setup.py index faa1f8c..a2d4a90 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ def get_version(rel_path: str) -> str: author='FNNDSC', author_email='dev@babyMRI.org', url='https://github.com/FNNDSC/pl-reg_', - py_modules=['reg_chxr','chris_pacs_service','base_client','chrisClient','pipeline'], + py_modules=['reg_chxr','chris_pacs_service','base_client','chrisClient','pipeline','pfdcm'], install_requires=['chris_plugin'], license='MIT', entry_points={