diff --git a/nlds_processors/nlds_worker.py b/nlds_processors/nlds_worker.py index 6e8d2db5..12303ab9 100644 --- a/nlds_processors/nlds_worker.py +++ b/nlds_processors/nlds_worker.py @@ -185,8 +185,36 @@ def _process_rk_catalog_get_complete(self, rk_parts: List, body_json: Dict) -> N self.publish_and_log_message(new_routing_key, body_json) def _process_rk_archive_get_complete(self, rk_parts: List, body_json: Dict) -> None: - # Can simply call the same process used at catalog_get complete - self._process_rk_catalog_get_complete(rk_parts, body_json) + # After a successful ARCHIVE_GET, the catalog is updated with the locations + # of the files on the OBJECT STORAGE + self.log( + f"Archive get successful, sending filelist with object storage locations " + "to be modified in the catalog", + RK.LOG_INFO, + ) + queue = f"{RK.CATALOG_UPDATE}" + new_routing_key = ".".join([RK.ROOT, queue, RK.START]) + self.log( + f"Sending message to {queue} queue with routing key {new_routing_key}", + RK.LOG_INFO, + ) + self.publish_and_log_message(new_routing_key, body_json) + + def _process_rk_catalog_update_complete( + self, rk_parts: List, body_json: Dict + ) -> None: + # After a successful CATALOG_UPDATE, as part of the GET workflow, the files + # are get from the Object Store using TRANSFER_GET + # this is the same as the when catalog get is complete + try: + api_method = body_json[MSG.DETAILS][MSG.API_ACTION] + if api_method == RK.GET or api_method == RK.GETLIST: + self._process_rk_catalog_get_complete(rk_parts, body_json) + except KeyError: + self.log( + f"Message did not contain an appropriate api_action.", + RK.LOG_ERROR, + ) def _process_rk_catalog_get_archive_restore( self, rk_parts: List, body_json: Dict @@ -204,8 +232,7 @@ def _process_rk_catalog_get_archive_restore( queue = RK.ARCHIVE_GET new_routing_key = ".".join([RK.ROOT, queue, RK.START]) self.log( - f"Sending message to {queue} queue with routing key " - f"{new_routing_key}", + f"Sending message to {queue} queue with routing key " f"{new_routing_key}", RK.LOG_INFO, ) self.publish_and_log_message(new_routing_key, body_json) @@ -217,7 +244,7 @@ def _process_rk_archive_get_failed(self, body_json: Dict) -> None: RK.LOG_INFO, ) - queue = f"{RK.CATALOG_ARCHIVE_REMOVE}" + queue = f"{RK.CATALOG_REMOVE}" new_routing_key = ".".join([RK.ROOT, queue, RK.START]) self.log( f"Sending message to {queue} queue with routing key {new_routing_key}", @@ -286,7 +313,7 @@ def _process_rk_archive_put_failed(self, body_json: Dict) -> None: RK.LOG_INFO, ) - queue = f"{RK.CATALOG_ARCHIVE_REMOVE}" + queue = f"{RK.CATALOG_REMOVE}" new_routing_key = ".".join([RK.ROOT, queue, RK.START]) self.log( f"Sending message to {queue} queue with routing key {new_routing_key}", @@ -309,7 +336,7 @@ def callback( self.log( f"Received with routing_key: {method.routing_key}", RK.LOG_INFO, - body_json=body_json + body_json=body_json, ) # If received system test message, reply to it (this is for system status check) @@ -344,7 +371,7 @@ def callback( elif rk_parts[1] == f"{RK.CATALOG_GET}": self._process_rk_catalog_get_complete(rk_parts, body_json) - # If finished with archive retrieval then pass for transfer-get + # If finished with archive retrieval then pass for catalog-update elif rk_parts[1] == f"{RK.ARCHIVE_GET}": self._process_rk_archive_get_complete(rk_parts, body_json) @@ -352,6 +379,7 @@ def callback( # archive write elif rk_parts[1] == RK.CATALOG_ARCHIVE_NEXT: self._process_rk_catalog_archive_next_complete(rk_parts, body_json) + # If finished with archive write, then pass checksum info to catalog elif rk_parts[1] == f"{RK.ARCHIVE_PUT}": self._process_rk_archive_put_complete(rk_parts, body_json)