Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes from development #125

Merged
merged 29 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
671e1d0
Started writing nlds_monitor.py
Will-Cross1 Oct 7, 2024
32c6e67
Added further functionality
Will-Cross1 Oct 7, 2024
dcae95c
Added more features and tidied up the code
Will-Cross1 Oct 8, 2024
ee2cfd4
Removed excess whitespace
Will-Cross1 Oct 9, 2024
83230c0
Split the query_monitor_db function and added a new filter
Will-Cross1 Oct 10, 2024
ddbfb04
Added a pytest for nlds_monitor
Will-Cross1 Oct 15, 2024
f0e36c4
Merge branch 'development' into monitor-tools
Will-Cross1 Oct 15, 2024
d53884c
Finished all of the pytests
Will-Cross1 Oct 17, 2024
3c84ea3
Formatted the files with black
Will-Cross1 Oct 21, 2024
398005d
Updated click function to send functionality to other functions
Will-Cross1 Oct 22, 2024
58f276c
Added job label to complex view, changed creation time to last update…
Will-Cross1 Oct 23, 2024
d23b822
Merge pull request #112 from cedadev/monitor-tools
nmassey001 Jan 27, 2025
1da2a25
Corrected some typos and information about where staging object store is
nmassey001 Jan 30, 2025
6711964
Added option to specify where CONFIG_FILE_LOCATION is when running mo…
nmassey001 Jan 30, 2025
977b158
Add settings to reset_storage_status
nmassey001 Feb 4, 2025
61379a9
Improved error message for Location URL not being empty
nmassey001 Feb 5, 2025
f704662
Improved error checking
nmassey001 Feb 13, 2025
41599b4
Better error handling
nmassey001 Feb 13, 2025
469c3a4
Better error handling
nmassey001 Feb 13, 2025
ef4f628
Filter on tenancy when doing next archive
nmassey001 Feb 13, 2025
c3b4cba
Filter on tenancy when doing next archive
nmassey001 Feb 13, 2025
a6f2b47
Formatting
nmassey001 Feb 13, 2025
365ac20
Better error handling
nmassey001 Feb 13, 2025
ceba30d
Better error handling
nmassey001 Feb 13, 2025
8534111
Fixed get_files always returning only one file. Fixed duplicate label…
nmassey001 Feb 17, 2025
9107fce
Use newest_only when getting a file
nmassey001 Feb 17, 2025
6a13eaf
Fixed infinite loop in _get_parent_dirs and only try to change direct…
nmassey001 Feb 17, 2025
0395457
extra config option for NLDS user
nmassey001 Feb 17, 2025
fa61e10
Merge branch 'main' into development
nmassey001 Feb 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ manage the dependencies.
2. Activate the nlds-venv:

```
source nlds-venv/bin/activate
source ~/nlds-venv/bin/activate
```

3. You could either install the nlds package with editing capability from a
Expand Down
Empty file added conftest.py
Empty file.
6 changes: 3 additions & 3 deletions docs/source/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,11 @@ everything on this page, this was true at the time of writing (2024-03-06).
- on ``db5`` - ``nlds_{db_name}_staging``
- on ``db5`` - ``nlds_{db_name}``
* - Logging
- To ``fluentbit`` with tags ``nlds_statging_{service_name}_log``
- To ``fluentbit`` with tags ``nlds_staging_{service_name}_log``
- To ``fluentbit`` with tags ``nlds_prod_{service_name}_log``
* - Object store
- Uses the ``cedaproc-o`` tenancy
- Uses ``nlds-cache-02-o`` tenancy, ``nlds-cache-01-o`` also available
- Uses the ``nlds-staging-o`` tenancy, ``(50TB)`` available
- Uses ``nlds-cache-01-o`` tenancy, ``(2PB)`` available
* - API Server
- `https://nlds-master.130.246.130.221.nip.io/ <https://nlds-master.130.246.130.221.nip.io/docs>`_ (firewalled)
- `https://nlds.jasmin.ac.uk/ <https://nlds.jasmin.ac.uk/docs>`_ (public, ssl secured)
Expand Down
74 changes: 46 additions & 28 deletions nlds/authenticators/authenticate_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,50 +48,68 @@ async def authenticate_token(token: str = Depends(oauth2_scheme)):
async def authenticate_user(user: str, token: str = Depends(oauth2_scheme)):
"""Check the user by calling the authenticator's authenticate_user
method."""
if token is None:
response_error = ResponseError(
loc = ["authenticate_methods", "authenticate_user"],
msg = f"OAuth token not supplied.",
type = "Forbidden."
)
try:
if token is None:
response_error = ResponseError(
loc = ["authenticate_methods", "authenticate_user"],
msg = f"OAuth token not supplied.",
type = "Forbidden."
)
raise HTTPException(
status_code = status.HTTP_403_FORBIDDEN,
detail = response_error.json()
)
elif not authenticator.authenticate_user(token, user):
response_error = ResponseError(
loc = ["authenticate_methods", "authenticate_user"],
msg = f"User {user} could not be found.",
type = "Forbidden."
)
raise HTTPException(
status_code = status.HTTP_404_NOT_FOUND,
detail = response_error.json()
)
except RuntimeError as e:
raise HTTPException(
status_code = status.HTTP_403_FORBIDDEN,
detail = response_error.json()
)
elif not authenticator.authenticate_user(token, user):
response_error = ResponseError(
loc = ["authenticate_methods", "authenticate_user"],
msg = f"User {user} could not be found.",
type = "Forbidden."
)
raise HTTPException(
status_code = status.HTTP_404_NOT_FOUND,
detail = response_error.json()
)
return user


async def authenticate_group(group: str, token: str = Depends(oauth2_scheme)):
"""Check the group by calling the authenticator's authenticate_user
method."""
if token is None:
response_error = ResponseError(
loc = ["authenticate_methods", "authenticate_group"],
msg = "OAuth token not supplied.",
type = "Forbidden."
)
raise HTTPException(
status_code = status.HTTP_403_FORBIDDEN,
detail = response_error.json()
)
elif not authenticator.authenticate_group(token, group):
try:
if token is None:
response_error = ResponseError(
loc = ["authenticate_methods", "authenticate_group"],
msg = "OAuth token not supplied.",
type = "Forbidden."
)
raise HTTPException(
status_code = status.HTTP_403_FORBIDDEN,
detail = response_error.json()
)
elif not authenticator.authenticate_group(token, group):
response_error = ResponseError(
loc = ["authenticate_methods", "authenticate_group"],
msg = f"User is not a member of the group {group}.",
type = "Resource not found."
)
raise HTTPException(
status_code = status.HTTP_404_NOT_FOUND,
detail = response_error.json()
)
except RuntimeError as e:
response_error = ResponseError(
loc = ["authenticate_methods", "authenticate_group"],
msg = f"User is not a member of the group {group}.",
type = "Resource not found."
)
raise HTTPException(
status_code = status.HTTP_404_NOT_FOUND,
status_code = status.HTTP_403_FORBIDDEN,
detail = response_error.json()
)

return group
21 changes: 12 additions & 9 deletions nlds/rabbit/statting_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,17 @@ def check_path_access(
if not isinstance(path, pth.Path):
raise ValueError("No valid path object was given.")

if not path.exists():
# Can't access or stat something that doesn't exist
try:
if not path.exists():
# Can't access or stat something that doesn't exist
check_path = False
else:
# If no stat result is passed through then get our own
if stat_result is None:
stat_result = path.stat()
check_path = check_permissions(
self.uid, self.gids, access=access, stat_result=stat_result
)
except PermissionError:
check_path = False
else:
# If no stat result is passed through then get our own
if stat_result is None:
stat_result = path.stat()
check_path = check_permissions(
self.uid, self.gids, access=access, stat_result=stat_result
)
return check_path
4 changes: 2 additions & 2 deletions nlds_processors/archive/s3_to_tarfile_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from hashlib import shake_256
from typing import List
from urllib3.exceptions import HTTPError
from urllib3.exceptions import HTTPError, MaxRetryError
import tarfile
from datetime import datetime
from abc import abstractmethod
Expand Down Expand Up @@ -89,7 +89,7 @@ def _check_files_exist(self):
)
failed_list.append(path_details)
continue
except (S3Error, HTTPError) as e:
except (S3Error, HTTPError, MaxRetryError) as e:
path_details.failure_reason = (
f"Could not verify that bucket {check_bucket} exists before "
f"writing to tape. Original exception: {e}"
Expand Down
45 changes: 21 additions & 24 deletions nlds_processors/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ def modify_holding(
holding.label = new_label
self.session.flush()
except IntegrityError:
# rollback so we can access the holding
self.session.rollback()
raise CatalogError(
f"Cannot change holding with label:{holding.label} and "
f"holding_id:{holding.id} to new label:{new_label}. New "
Expand Down Expand Up @@ -318,6 +320,7 @@ def get_files(
transaction_id: str = None,
original_path: str = None,
tag: dict = None,
newest_only: bool = False
) -> list:
"""Get a multitude of file details from the database, given the user,
group, label, holding_id, path (can be regex) or tag(s)"""
Expand All @@ -340,21 +343,25 @@ def get_files(
search_path = ".*"

# (permissions have been checked by get_holding)
file_dict = {}
file_list = []
try:
for h in holding:
# build the file query bit by bit
file_q = self.session.query(File, Transaction).filter(
File.transaction_id == Transaction.id,
Transaction.holding_id == h.id,
)
).order_by(Transaction.ingest_time)
if is_regex(search_path):
file_q = file_q.filter(File.original_path.regexp_match(search_path))
else:
file_q = file_q.filter(File.original_path == search_path)

result = file_q.all()

# only want one file if newest_only is set: (this is for downloading
# when only the path is specified and no holding id or label is given)
# the results have been orderd by the Transaction ingest time
if newest_only:
result = [result[0]]
for r in result:
if r.File is None:
continue
Expand All @@ -366,26 +373,8 @@ def get_files(
f"User:{user} in group:{group} does not have permission to "
f"access the file with original path:{r.File.original_path}."
)
file_list.append(r.File)

# if the file exists in more than one holding then it will appear
# in the results list more than once. we want to return the newest
# so we will build a dictionary indexed by the original path and
# compare the ingest times as to whether to replace the entry in the
# dictionary
if r.File.original_path in file_dict:
curr_ingest_time = (
file_dict[r.File.original_path][1].ingest_time
)
file_ingest_time = r.Transaction.ingest_time
if (file_ingest_time > curr_ingest_time):
file_dict[r.File.original_path] = (r.File, r.Transaction)

else:
file_dict[r.File.original_path] = (r.File, r.Transaction)

# convert file_dict (which also contains transactions) to a list of File
# objects
file_list = [fd[0] for _, fd in file_dict.items()]
# no files found
if len(file_list) == 0:
raise KeyError
Expand Down Expand Up @@ -681,9 +670,15 @@ def delete_aggregation(self, aggregation: Aggregation) -> None:
)
raise CatalogError(err_msg)

def get_next_unarchived_holding(self) -> Holding:
def get_next_unarchived_holding(self, tenancy: str) -> Holding:
"""The principal function for getting the next unarchived holding to
archive aggregate."""
archive aggregate.
A tenancy is passed in so that the only holdings attempted to be backed up are
those that can be accessed via the object store keys also passed in the body.
Otherwise, when the archive_put process tries to stream the files from the
object store to the tape, the keys don't match the tenancy and an access denied
error is produced.
"""
if self.session is None:
raise RuntimeError("self.session is None")
try:
Expand Down Expand Up @@ -711,6 +706,8 @@ def get_next_unarchived_holding(self) -> Holding:
File.locations.any(Location.storage_type == Storage.OBJECT_STORAGE),
)
.order_by(Holding.id)
# tenancy is stored in url_netloc part of Location
.filter(Location.url_netloc == tenancy)
.first()
)

Expand Down
17 changes: 10 additions & 7 deletions nlds_processors/catalog/catalog_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,6 @@ def _get_or_create_holding(
):
"""Get a holding via label or holding_id or transaction_id.
If the holding doesn't already exist then create it."""
self.log(
f"!!!! HOLDING: {holding_id}, LABEL: {search_label}, TRANSID: {transaction_id}", RK.LOG_DEBUG
)
# try to get the holding to see if it already exists and can be added to
try:
# don't use tags to search - they are strictly for adding to the holding
Expand Down Expand Up @@ -382,9 +379,7 @@ def _get_or_create_holding(
raise e
else:
if len(holding) > 1:
raise CatalogError(
f"More than one holding found for label {new_label}"
)
raise CatalogError(f"More than one holding found for label {new_label}")
else:
holding = holding[0]
return holding
Expand Down Expand Up @@ -717,6 +712,7 @@ def _catalog_get(self, body: Dict, rk_origin: str) -> None:
transaction_id=transaction_id,
original_path=filepath_details.original_path,
tag=holding_tag,
newest_only=True
)

if len(files) == 0:
Expand Down Expand Up @@ -844,12 +840,19 @@ def _filemodel_to_path_details(self, file: File):
def _catalog_archive_put(self, body: Dict, rk_origin: str) -> None:
"""Get the next holding for archiving, create a new location for it and pass it
for aggregating to the Archive Put process."""

try:
tenancy = self._parse_tenancy(body)
except CatalogError:
# functions above handled message logging, here we just return
return

# start the database transactions
self.catalog.start_session()

# Get the next holding in the catalog, by id, which has any unarchived
# Files, i.e. any files which don't have a tape location
next_holding = self.catalog.get_next_unarchived_holding()
next_holding = self.catalog.get_next_unarchived_holding(tenancy)

# If no holdings left to archive then end the callback
if not next_holding:
Expand Down
6 changes: 3 additions & 3 deletions nlds_processors/transfer/base_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class BaseTransferConsumer(StattingConsumer, ABC):
_REQUIRE_SECURE: True,
_PRINT_TRACEBACKS: False,
_FILELIST_MAX_LENGTH: 1000,
StattingConsumer._FILELIST_MAX_SIZE: 16 * 1000 * 1000
StattingConsumer._FILELIST_MAX_SIZE: 16 * 1000 * 1000,
}

def __init__(self, queue=DEFAULT_QUEUE_NAME):
Expand Down Expand Up @@ -65,7 +65,7 @@ def _callback_common(self, cm, method, properties, body, connection):
self.log(
f"Received from {self.queues[0].name} ({method.routing_key})",
RK.LOG_DEBUG,
body_json=self.body_json
body_json=self.body_json,
)

# Verify routing key is appropriate
Expand Down Expand Up @@ -141,7 +141,7 @@ def callback(self, ch, method, properties, body, connection):
self.log(
"Aggregating list into more appropriately sized sub-lists for "
"parallelised uploads.",
RK.LOG_INFO
RK.LOG_INFO,
)
# Make a new routing key which returns message to this queue
rk_transfer_start = ".".join([self.rk_parts[0], self.rk_parts[1], RK.START])
Expand Down
Loading
Loading