Skip to content

Commit

Permalink
retry retrieve module
Browse files Browse the repository at this point in the history
  • Loading branch information
Sandip117 committed Jan 28, 2025
1 parent d76451d commit 22728f3
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 61 deletions.
65 changes: 65 additions & 0 deletions pfdcm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import requests
from loguru import logger
import sys
import copy
from collections import ChainMap
import json

LOG = logger.debug

logger_format = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> │ "
"<level>{level: <5}</level> │ "
"<yellow>{name: >28}</yellow>::"
"<cyan>{function: <30}</cyan> @"
"<cyan>{line: <4}</cyan> ║ "
"<level>{message}</level>"
)
logger.remove()
logger.add(sys.stderr, format=logger_format)

def health_check(url: str):
pfdcm_about_api = f'{url}about/'
headers = {'Content-Type': 'application/json', 'accept': 'application/json'}
try:
response = requests.get(pfdcm_about_api, headers=headers)
return response
except Exception as er:
raise Exception("Connection to pfdcm could not be established.")


def retrieve_pacsfiles(directive: dict, url: str, pacs_name: str):
"""
This method uses the async API endpoint of `pfdcm` to send a single 'retrieve' request that in
turn uses `oxidicom` to push and register PACS files to a CUBE instance
"""

pfdcm_dicom_api = f'{url}PACS/thread/pypx/'
headers = {'Content-Type': 'application/json', 'accept': 'application/json'}
body = {
"PACSservice": {
"value": pacs_name
},
"listenerService": {
"value": "default"
},
"PACSdirective": {
"withFeedBack": True,
"then": "retrieve",
"thenArgs": '',
"dblogbasepath": '/home/dicom/log',
"json_response": False
}
}
body["PACSdirective"].update(directive)
LOG(f"request : {body}")

try:
response = requests.post(pfdcm_dicom_api, json=body, headers=headers)
d_response = json.loads(response.text)
if d_response['response']['job']['status']:
return d_response
else:
raise Exception(d_response['message'])
except Exception as er:
LOG(er)
135 changes: 75 additions & 60 deletions reg_chxr.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import copy
import sys
import os
import pfdcm
import copy

LOG = logger.debug

Expand Down Expand Up @@ -41,6 +43,18 @@

parser = ArgumentParser(description='A plugin to wait till a particular set of PACS files are registered to a CUBE instance',
formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument(
'--PACSurl',
default='',
type=str,
help='endpoint URL of pfdcm'
)
parser.add_argument(
'--PACSname',
default='MINICHRISORTHANC',
type=str,
help='name of the PACS'
)
parser.add_argument(
"--CUBEurl",
default="http://localhost:8000/api/v1/",
Expand Down Expand Up @@ -92,23 +106,20 @@
help='Orthanc server url',
default='http://0.0.0.0:8042'
)

parser.add_argument(
'--orthancUsername',
dest='orthancUsername',
type=str,
help='Orthanc server username',
default='orthanc'
)

parser.add_argument(
'--orthancPassword',
dest='orthancPassword',
type=str,
help='Orthanc server password',
default='orthanc'
)

parser.add_argument(
'--pushToRemote',
dest='pushToRemote',
Expand Down Expand Up @@ -182,63 +193,8 @@ def main(options: Namespace, inputdir: Path, outputdir: Path):
if len(data) == 0:
raise Exception(f"Cannot verify registration for empty pacs data.")

status: bool = True
# for each individual series, check if total file count matches total file registered
for series in data:
pacs_search_params = sanitize_for_cube(series)
file_count = int(series["NumberOfSeriesRelatedInstances"])
registered_file_count = cube_cl.get_pacs_registered(pacs_search_params)
LOG(f"Polling for SeriesInstanceUID: {series["SeriesInstanceUID"]}")

# keep a record all the series that were not registered to retry
unregistered_series = []

# no. of times to retry unregistered series
retries = 5

# poll CUBE at regular interval for the status of file registration
poll_count = 0
total_polls = options.maxPoll
wait_poll = options.pollInterval
while registered_file_count < 1 and poll_count <= total_polls:
poll_count += 1
time.sleep(wait_poll)
registered_file_count = cube_cl.get_pacs_registered(pacs_search_params)
LOG(f"{registered_file_count} series found in CUBE.")
unregistered_series.append(pacs_search_params)

# check if polling timed out before registration is finished
if registered_file_count == 0:
LOG(f"PACS file registration unsuccessful. Please try again.")
status = False
continue
# raise Exception(f"PACS file registration unsuccessful. Please try again.")
LOG(f"{file_count} files successfully registered to CUBE.")
send_params = {
"url": options.orthancUrl,
"username": options.orthancUsername,
"password": options.orthancPassword,
"aec": options.pushToRemote
}
dicom_dir = cube_cl.get_pacs_files(pacs_search_params)

# create connection object
cube_con = ChrisClient(options.CUBEurl, options.CUBEuser, options.CUBEpassword)
cube_con.anonymize(dicom_dir, options.tagStruct,send_params, options.pluginInstanceID)

if not status:
raise Exception(f"PACS file registration unsuccessful. Please try again.")
if unregistered_series:
retry_retrieve(unregistered_series, retries)

def verify_registration(l_series: dict):
pass

def retry_retrieve(series: dict, retries: int):
retries -= 1
for item in series:
# retrieve series
pass
retry_table = create_hash_table(data, 5)
check_registration(options, retry_table, cube_cl)


def sanitize_for_cube(series: dict) -> dict:
Expand Down Expand Up @@ -269,6 +225,65 @@ def health_check(options) -> bool:
return False
return True

def create_hash_table(retrieve_data: dict, retry: int) -> dict:
retry_table: dict = {}
for series in retrieve_data:
retry_table[series["SeriesInstanceUID"]] = retry
return retry_table

def check_registration(options: Namespace, retry_table: dict, client: PACSClient):
# null check
if len(retry_table) == 0:
return

clone_retry_table = copy.deepcopy(retry_table)

for series_instance in retry_table.keys():
LOG(f"Polling CUBE for series: {series_instance}.")
registered_series_count = client.get_pacs_registered({'SeriesInstanceUID':series_instance})

# poll CUBE at regular interval for the status of file registration
poll_count: int = 0
total_polls: int = options.maxPoll
wait_poll: int = options.pollInterval
while registered_series_count < 1 and poll_count < total_polls:
poll_count += 1
time.sleep(wait_poll)
registered_series_count = client.get_pacs_registered({'SeriesInstanceUID':series_instance})
LOG(f"{registered_series_count} series found in CUBE.")

# check if polling timed out before registration is finished
if registered_series_count == 0 and clone_retry_table[series_instance] > 0:
LOG(f"PACS series registration unsuccessful. Retrying retrieve for {series_instance}.")
# retry retrieve
retrieve_response = pfdcm.retrieve_pacsfiles({'SeriesInstanceUID':series_instance},
options.PACSurl, options.PACSname)

# save retry file
srs_json_file_path = os.path.join(options.outputdir,
f"{series_instance}_retrieve_retry_{clone_retry_table[series_instance]}.json")
clone_retry_table[series_instance] -= 1
with open(srs_json_file_path, 'w', encoding='utf-8') as jsonf:
jsonf.write(json.dumps(retrieve_response, indent=4))
continue

if registered_series_count:
LOG(f"Series {series_instance} successfully registered to CUBE.")
send_params = {
"url": options.orthancUrl,
"username": options.orthancUsername,
"password": options.orthancPassword,
"aec": options.pushToRemote
}
dicom_dir = client.get_pacs_files({'SeriesInstanceUID': series_instance})

# create connection object
cube_con = ChrisClient(options.CUBEurl, options.CUBEuser, options.CUBEpassword)
cube_con.anonymize(dicom_dir, options.tagStruct, send_params, options.pluginInstanceID)
clone_retry_table.pop(series_instance)

check_registration(options, clone_retry_table, client)


if __name__ == '__main__':
main()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_version(rel_path: str) -> str:
author='FNNDSC',
author_email='[email protected]',
url='https://github.com/FNNDSC/pl-reg_',
py_modules=['reg_chxr','chris_pacs_service','base_client','chrisClient','pipeline'],
py_modules=['reg_chxr','chris_pacs_service','base_client','chrisClient','pipeline','pfdcm'],
install_requires=['chris_plugin'],
license='MIT',
entry_points={
Expand Down

0 comments on commit 22728f3

Please sign in to comment.