Skip to content

Commit

Permalink
Factored some parts of callback to _callback_common
Browse files Browse the repository at this point in the history
  • Loading branch information
nmassey001 committed Oct 9, 2024
1 parent b210e87 commit f23c152
Showing 1 changed file with 41 additions and 32 deletions.
73 changes: 41 additions & 32 deletions nlds_processors/transferers/base_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@ class BaseTransferConsumer(StattingConsumer, ABC):
_TENANCY = "tenancy"
_REQUIRE_SECURE = "require_secure_fl"
_PRINT_TRACEBACKS = "print_tracebacks_fl"
_MAX_RETRIES = "max_retries"
_FILELIST_MAX_LENGTH = "filelist_max_length"
_REMOVE_ROOT_SLASH = "remove_root_slash_fl"
DEFAULT_CONSUMER_CONFIG = {
_TENANCY: None,
_REQUIRE_SECURE: True,
_PRINT_TRACEBACKS: False,
_MAX_RETRIES: 5,
_FILELIST_MAX_LENGTH: 1000,
}

Expand All @@ -48,30 +45,31 @@ def __init__(self, queue=DEFAULT_QUEUE_NAME):
self.tenancy = self.load_config_value(self._TENANCY)
self.require_secure_fl = self.load_config_value(self._REQUIRE_SECURE)
self.print_tracebacks_fl = self.load_config_value(self._PRINT_TRACEBACKS)
self.max_retries = self.load_config_value(self._MAX_RETRIES)
self.filelist_max_len = self.load_config_value(self._FILELIST_MAX_LENGTH)

self.reset()

def callback(self, ch, method, properties, body, connection):
def _callback_common(self, cm, method, properties, body, connection):
self.reset()

# Convert body from bytes to string for ease of manipulation
body = body.decode("utf-8")
body_json = json.loads(body)
self.body = body.decode("utf-8")
self.body_json = json.loads(self.body)

if self._is_system_status_check(body_json=body_json, properties=properties):
return
if self._is_system_status_check(
body_json=self.body_json, properties=properties
):
return False

self.log(
f"Received {json.dumps(body_json, indent=4)} from "
f"Received {json.dumps(self.body_json, indent=4)} from "
f"{self.queues[0].name} ({method.routing_key})",
RK.LOG_DEBUG,
)

# Verify routing key is appropriate
try:
rk_parts = self.split_routing_key(method.routing_key)
self.rk_parts = self.split_routing_key(method.routing_key)
except ValueError as e:
self.log(
"Routing key inappropriate length, exiting callback.", RK.LOG_ERROR
Expand All @@ -81,60 +79,71 @@ def callback(self, ch, method, properties, body, connection):
###
# Verify and load message contents
try:
transaction_id = body_json[MSG.DETAILS][MSG.TRANSACT_ID]
self.transaction_id = self.body_json[MSG.DETAILS][MSG.TRANSACT_ID]
except KeyError:
self.log("Transaction id unobtainable, exiting callback.", RK.LOG_ERROR)
return

try:
filelist = self.parse_filelist(body_json)
self.filelist = self.parse_filelist(self.body_json)
except TypeError as e:
self.log("Filelist not parseable, exiting callback", RK.LOG_ERROR)
return

try:
access_key, secret_key, tenancy = self.get_objectstore_config(body_json)
(self.access_key, self.secret_key, self.tenancy) = (
self.get_objectstore_config(self.body_json)
)
except TransferError:
self.log("Objectstore config unobtainable, exiting callback.", RK.LOG_ERROR)
return

# Set uid and gid from message contents
self.set_ids(body_json)
self.log("Setting uid and gids now.", RK.LOG_INFO)
self.set_ids(self.body_json)

# Append route info to message to track the route of the message
body_json = self.append_route_info(body_json)
self.body_json = self.append_route_info(self.body_json)
return True

if rk_parts[2] == RK.INITIATE:
def callback(self, ch, method, properties, body, connection):

if not self._callback_common(ch, method, properties, body, connection):
return

if self.rk_parts[2] == RK.INITIATE:
self.log(
"Aggregating list into more appropriately sized sub-lists for "
"parallelised uploads.",
RK.LOG_INFO,
RK.LOG_INFO
)

# Make a new routing key which returns message to this queue
rk_transfer_start = ".".join([rk_parts[0], rk_parts[1], RK.START])
rk_transfer_start = ".".join([self.rk_parts[0], self.rk_parts[1], RK.START])
# Aggregate files into bins of approximately equal size and split
# the transaction into subtransactions to allow parallel transfers
sub_lists = bin_files(filelist)
sub_lists = bin_files(self.filelist)
for sub_list in sub_lists:
self.send_pathlist(
sub_list, rk_transfer_start, body_json, state=State.INITIALISING
sub_list,
rk_transfer_start,
self.body_json,
state=State.INITIALISING,
)
elif rk_parts[2] == RK.START:
elif self.rk_parts[2] == RK.START:
# Start transfer - this is implementation specific and handled by
# child classes
self.log(f"Starting object store transfer with {tenancy}", RK.LOG_INFO)
self.log(f"Starting object store transfer with {self.tenancy}", RK.LOG_INFO)
self.transfer(
transaction_id,
tenancy,
access_key,
secret_key,
filelist,
rk_parts[0],
body_json,
self.transaction_id,
self.tenancy,
self.access_key,
self.secret_key,
self.filelist,
self.rk_parts[0],
self.body_json,
)
else:
raise TransferError(f"Unknown routing key {rk_parts[2]}")
raise TransferError(f"Unknown routing key {self.rk_parts[2]}")

def get_objectstore_config(self, body_dict) -> Tuple:
try:
Expand Down

0 comments on commit f23c152

Please sign in to comment.