Skip to content

Commit

Permalink
Rework for modifying OBJECT_STORAGE locations after GET from archive
Browse files Browse the repository at this point in the history
  • Loading branch information
nmassey001 committed Oct 22, 2024
1 parent 26369b5 commit 34b6dfe
Showing 1 changed file with 36 additions and 8 deletions.
44 changes: 36 additions & 8 deletions nlds_processors/nlds_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,36 @@ def _process_rk_catalog_get_complete(self, rk_parts: List, body_json: Dict) -> N
self.publish_and_log_message(new_routing_key, body_json)

def _process_rk_archive_get_complete(self, rk_parts: List, body_json: Dict) -> None:
# Can simply call the same process used at catalog_get complete
self._process_rk_catalog_get_complete(rk_parts, body_json)
# After a successful ARCHIVE_GET, the catalog is updated with the locations
# of the files on the OBJECT STORAGE
self.log(
f"Archive get successful, sending filelist with object storage locations "
"to be modified in the catalog",
RK.LOG_INFO,
)
queue = f"{RK.CATALOG_UPDATE}"
new_routing_key = ".".join([RK.ROOT, queue, RK.START])
self.log(
f"Sending message to {queue} queue with routing key {new_routing_key}",
RK.LOG_INFO,
)
self.publish_and_log_message(new_routing_key, body_json)

def _process_rk_catalog_update_complete(
self, rk_parts: List, body_json: Dict
) -> None:
# After a successful CATALOG_UPDATE, as part of the GET workflow, the files
# are get from the Object Store using TRANSFER_GET
# this is the same as the when catalog get is complete
try:
api_method = body_json[MSG.DETAILS][MSG.API_ACTION]
if api_method == RK.GET or api_method == RK.GETLIST:
self._process_rk_catalog_get_complete(rk_parts, body_json)
except KeyError:
self.log(
f"Message did not contain an appropriate api_action.",
RK.LOG_ERROR,
)

def _process_rk_catalog_get_archive_restore(
self, rk_parts: List, body_json: Dict
Expand All @@ -204,8 +232,7 @@ def _process_rk_catalog_get_archive_restore(
queue = RK.ARCHIVE_GET
new_routing_key = ".".join([RK.ROOT, queue, RK.START])
self.log(
f"Sending message to {queue} queue with routing key "
f"{new_routing_key}",
f"Sending message to {queue} queue with routing key " f"{new_routing_key}",
RK.LOG_INFO,
)
self.publish_and_log_message(new_routing_key, body_json)
Expand All @@ -217,7 +244,7 @@ def _process_rk_archive_get_failed(self, body_json: Dict) -> None:
RK.LOG_INFO,
)

queue = f"{RK.CATALOG_ARCHIVE_REMOVE}"
queue = f"{RK.CATALOG_REMOVE}"
new_routing_key = ".".join([RK.ROOT, queue, RK.START])
self.log(
f"Sending message to {queue} queue with routing key {new_routing_key}",
Expand Down Expand Up @@ -286,7 +313,7 @@ def _process_rk_archive_put_failed(self, body_json: Dict) -> None:
RK.LOG_INFO,
)

queue = f"{RK.CATALOG_ARCHIVE_REMOVE}"
queue = f"{RK.CATALOG_REMOVE}"
new_routing_key = ".".join([RK.ROOT, queue, RK.START])
self.log(
f"Sending message to {queue} queue with routing key {new_routing_key}",
Expand All @@ -309,7 +336,7 @@ def callback(
self.log(
f"Received with routing_key: {method.routing_key}",
RK.LOG_INFO,
body_json=body_json
body_json=body_json,
)

# If received system test message, reply to it (this is for system status check)
Expand Down Expand Up @@ -344,14 +371,15 @@ def callback(
elif rk_parts[1] == f"{RK.CATALOG_GET}":
self._process_rk_catalog_get_complete(rk_parts, body_json)

# If finished with archive retrieval then pass for transfer-get
# If finished with archive retrieval then pass for catalog-update
elif rk_parts[1] == f"{RK.ARCHIVE_GET}":
self._process_rk_archive_get_complete(rk_parts, body_json)

# If finished with aggregation of unarchived holding, then send for
# archive write
elif rk_parts[1] == RK.CATALOG_ARCHIVE_NEXT:
self._process_rk_catalog_archive_next_complete(rk_parts, body_json)

# If finished with archive write, then pass checksum info to catalog
elif rk_parts[1] == f"{RK.ARCHIVE_PUT}":
self._process_rk_archive_put_complete(rk_parts, body_json)
Expand Down

0 comments on commit 34b6dfe

Please sign in to comment.