diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 2ca4fe1..66f6856 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -13,7 +13,7 @@ defaults: jobs: build: - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 strategy: matrix: python-version: ["3.12"] @@ -45,6 +45,6 @@ jobs: GIRDER_MAX_CURSOR_TIMEOUT_MS: 60000 run: tox -e pytest - name: Upload Coverage to Codecov - uses: codecov/codecov-action@v4 + uses: codecov/codecov-action@v5 with: token: ${{ secrets.CODECOV_TOKEN }} diff --git a/girder_wholetale/__init__.py b/girder_wholetale/__init__.py index eb47112..ab96970 100644 --- a/girder_wholetale/__init__.py +++ b/girder_wholetale/__init__.py @@ -12,7 +12,7 @@ import jsonschema import six import validators -from girder import events +from girder import auditLogger, events from girder.api import access from girder.api.describe import Description, autoDescribeRoute, describeRoute from girder.api.rest import RestException, boundHandler, loadmodel @@ -52,9 +52,9 @@ from .lib.path_mapper import PathMapper from .models.instance import Instance as InstanceModel from .models.lock import Lock as LockModel -from .models.version_hierarchy import VersionHierarchyModel from .models.session import Session as SessionModel from .models.transfer import Transfer as TransferModel +from .models.version_hierarchy import VersionHierarchyModel from .rest.account import Account from .rest.dataset import Dataset from .rest.dm import DM @@ -77,6 +77,7 @@ external_auth_providers_schema, repository_to_provider_schema, ) +from .utils import add_influx_handler logger = logging.getLogger(__name__) @@ -231,6 +232,10 @@ def _validateLogo(doc): PluginSettings.CONTACT_HREF, PluginSettings.BUG_HREF, PluginSettings.MOUNTS, + PluginSettings.INFLUXDB_URL, + PluginSettings.INFLUXDB_TOKEN, + PluginSettings.INFLUXDB_ORG, + PluginSettings.INFLUXDB_BUCKET, } ) def validateHref(doc): @@ -807,6 +812,7 @@ def load(self, info): pathMapper = PathMapper() from .lib.transfer_manager import SimpleTransferManager + transferManager = SimpleTransferManager(pathMapper) # a GC that does nothing @@ -825,7 +831,10 @@ def load(self, info): LRUSortingScheme(), ), ) - from .lib.cache_manager import SimpleCacheManager # Needs models to be registered + from .lib.cache_manager import ( + SimpleCacheManager, + ) + cacheManager = SimpleCacheManager( Setting(), transferManager, fileGC, pathMapper ) @@ -964,7 +973,11 @@ def sessionDeleted(event): metricsLogger.setLevel(logging.INFO) metricsLogger.addHandler(_MetricsHandler()) VersionHierarchyModel().resetCrashedCriticalSections() - + if Setting().get(PluginSettings.INFLUXDB_BUCKET): + add_influx_handler( + auditLogger, + Setting().get(PluginSettings.INFLUXDB_BUCKET), + ) registerPluginStaticContent( plugin="wholetale", css=["/style.css"], diff --git a/girder_wholetale/constants.py b/girder_wholetale/constants.py index 70909ea..58329c3 100644 --- a/girder_wholetale/constants.py +++ b/girder_wholetale/constants.py @@ -51,6 +51,10 @@ class PluginSettings: GC_RUN_INTERVAL = "dm.gc_run_interval" GC_COLLECT_START_FRACTION = "dm.gc_collect_start_fraction" GC_COLLECT_END_FRACTION = "dm.gc_collect_end_fraction" + INFLUXDB_URL = "wholetale.influxdb_url" + INFLUXDB_TOKEN = "wholetale.influxdb_token" + INFLUXDB_ORG = "wholetale.influxdb_org" + INFLUXDB_BUCKET = "wholetale.influxdb_bucket" SettingDefault.defaults.update( @@ -165,6 +169,10 @@ class PluginSettings: PluginSettings.RUNS_DIRS_ROOT: "/tmp/wt/runs-dirs", PluginSettings.VERSIONS_DIRS_ROOT: "/tmp/wt/versions-dirs", PluginSettings.DAV_SERVER: False, + PluginSettings.INFLUXDB_URL: "http://images.local.xarthisius.xyz:8086", + PluginSettings.INFLUXDB_TOKEN: "", + PluginSettings.INFLUXDB_ORG: "my_org", + PluginSettings.INFLUXDB_BUCKET: "", } ) diff --git a/girder_wholetale/tests/influxlog_test.py b/girder_wholetale/tests/influxlog_test.py new file mode 100644 index 0000000..6d69ef1 --- /dev/null +++ b/girder_wholetale/tests/influxlog_test.py @@ -0,0 +1,98 @@ +import io + +import cherrypy +import mock +import pytest +from girder import auditLogger +from girder.models.folder import Folder +from girder.models.setting import Setting +from girder.models.upload import Upload +from pytest_girder.assertions import assertStatusOk + +from girder_wholetale.constants import PluginSettings +from girder_wholetale.utils import add_influx_handler + + +@pytest.fixture +def influxdb_client(): + return mock.Mock() + + +@pytest.fixture +def enable_influxlog_setting(influxdb_client): + Setting().set(PluginSettings.INFLUXDB_BUCKET, "mock_bucket") + Setting().set(PluginSettings.INFLUXDB_TOKEN, "mock_token") + + yield + + Setting().set(PluginSettings.INFLUXDB_BUCKET, "") + Setting().set(PluginSettings.INFLUXDB_TOKEN, "") + + +@pytest.mark.plugin("wholetale") +def test_influxdb_logger(server, admin, enable_influxlog_setting, fsAssetstore): + with mock.patch( + "girder_wholetale.utils.influxdb.InfluxDBClient" + ) as influxdb_client: + add_influx_handler(auditLogger, Setting().get(PluginSettings.INFLUXDB_BUCKET)) + influx_logger = auditLogger.handlers[0] + assert influx_logger.bucket == "mock_bucket" + influxdb_client.assert_called_once_with( + url=Setting().get(PluginSettings.INFLUXDB_URL), + token="mock_token", + org=Setting().get(PluginSettings.INFLUXDB_ORG), + ) + + resp = server.request(path="/user/me", method="GET", user=admin) + assertStatusOk(resp) + + write = influx_logger.write_api.write + assert write.call_count == 1 + assert write.call_args_list[0].kwargs["record"]._fields == { + "method": "GET", + "status": 200, + "route": "user/me", + "params": "{}", + "message": "rest.request", + } + + folder = Folder().createFolder( + admin, "folder", parentType="user", public=True, creator=admin + ) + assert write.call_count == 2 + assert write.call_args_list[1].kwargs["record"]._fields == { + "collection": "folder", + "id": str(folder["_id"]), + } + + file = Upload().uploadFromFile( + io.BytesIO(b"blah blah"), + 9, + "test.txt", + parentType="folder", + parent=folder, + user=admin, + mimeType="text/plain", + ) + + resp = server.request( + path="/file/%s/download" % file["_id"], + method="GET", + user=admin, + isJson=False, + ) + assertStatusOk(resp) + assert resp.collapse_body() == b"blah blah" + assert write.call_count == 7 + assert write.call_args_list[-2].kwargs["record"]._fields == { + "fileId": str(file["_id"]), + "startBytes": None, + "endBytes": None, + "extraParameters": None, + } + assert influx_logger.write_api.close.call_count == 0 + assert influxdb_client.return_value.close.call_count == 0 + cherrypy.engine.restart() + assert influx_logger.write_api.close.call_count == 1 + assert influxdb_client.return_value.close.call_count == 1 + assert auditLogger.handlers == [] diff --git a/girder_wholetale/utils.py b/girder_wholetale/utils.py index a3aba34..1d26269 100644 --- a/girder_wholetale/utils.py +++ b/girder_wholetale/utils.py @@ -1,18 +1,108 @@ -import os import datetime +import json +import logging +import os import pathlib -import six.moves.urllib as urllib +from urllib.parse import quote_plus -from girder.utility.model_importer import ModelImporter +try: + import influxdb_client as influxdb +except ImportError: + influxdb = None + +import cherrypy from girder.models.notification import Notification -from girder.models.user import User from girder.models.setting import Setting +from girder.models.user import User +from girder.utility.model_importer import ModelImporter +from .constants import PluginSettings NOTIFICATION_EXP_HOURS = 1 WT_EVENT_EXP_SECONDS = int(os.environ.get("GIRDER_WT_EVENT_EXP_SECONDS", 5)) +class InfluxHandler(logging.Handler): + def __init__(self, bucket): + if influxdb is None: + raise ImportError("InfluxDB client not installed") + + super().__init__() + self.client = influxdb.InfluxDBClient( + url=Setting().get(PluginSettings.INFLUXDB_URL), + token=Setting().get(PluginSettings.INFLUXDB_TOKEN), + org=Setting().get(PluginSettings.INFLUXDB_ORG), + ) + self.write_api = self.client.write_api() + self.bucket = bucket + + @staticmethod + def _document_create(record): + return ( + influxdb.Point("document") + .tag("level", record.levelname) + .field("collection", record.details.get("collection")) + .field("id", str(record.details.get("id"))) + .time(int(record.created * 1e9), influxdb.WritePrecision.NS) + ) + + @staticmethod + def _download(record): + return ( + influxdb.Point("download") + .tag("level", record.levelname) + .field("fileId", str(record.details.get("fileId"))) + .field("startBytes", record.details.get("startBytes")) + .field("endBytes", record.details.get("endBytes")) + .field("extraParameters", record.details.get("extraParameters")) + .time(int(record.created * 1e9), influxdb.WritePrecision.NS) + ) + + @staticmethod + def _rest_request(record): + return ( + influxdb.Point("rest_request") + .tag("level", record.levelname) + .field("method", record.details.get("method")) + .field("status", record.details.get("status")) + .field("route", "/".join(record.details.get("route", ""))) + .field("params", json.dumps(record.details.get("params"))) + .field("message", record.getMessage()) + .time(int(record.created * 1e9), influxdb.WritePrecision.NS) + ) + + def emit(self, record): + match record.getMessage(): + case "document.create": + point = self._document_create(record) + case "file.download": + point = self._download(record) + case "rest.request": + point = self._rest_request(record) + self.write_api.write(bucket=self.bucket, record=point) + + def close(self): + self.write_api._write_options.write_scheduler.executor.shutdown(wait=False) + self.write_api.close() + self.client.close() + super().close() + + +def add_influx_handler(logger, bucket): + def stop_influx_client(): + for handler in logger.handlers: + if isinstance(handler, InfluxHandler): + logger.removeHandler(handler) + handler.close() + + stop_influx_client() # Remove any existing handlers + logger.addHandler(InfluxHandler(bucket)) + stop_influx_client.priority = 10 + cherrypy.engine.subscribe( + "stop", stop_influx_client + ) # ensure we clean up on restart + + def get_tale_dir_root(tale: dict, root_path_setting: str) -> pathlib.Path: root = Setting().get(root_path_setting) return pathlib.Path(root) / str(tale["_id"])[0:2] / str(tale["_id"]) @@ -41,7 +131,7 @@ def esc(value): :return: The escaped string :rtype: str """ - return urllib.parse.quote_plus(value) + return quote_plus(value) def notify_event(users, event, affectedIds): diff --git a/requirements.txt b/requirements.txt index 4f6cf3e..028e979 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ httpio>=0.3.0 fs pathvalidate bdbag +influxdb-client diff --git a/setup.py b/setup.py index 0b0766c..ca0e493 100644 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ "rdflib", "celery[redis]", "pathvalidate", + "influxdb-client", "python-magic", "requests", "validators",