Skip to content

Commit

Permalink
Merge pull request #2 from Xarthisius/influx_logger
Browse files Browse the repository at this point in the history
Add logging to InfluxDB
  • Loading branch information
Xarthisius authored Jan 16, 2025
2 parents cf5ce1f + f2a225b commit db2acb7
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 11 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defaults:

jobs:
build:
runs-on: ubuntu-latest
runs-on: ubuntu-24.04
strategy:
matrix:
python-version: ["3.12"]
Expand Down Expand Up @@ -45,6 +45,6 @@ jobs:
GIRDER_MAX_CURSOR_TIMEOUT_MS: 60000
run: tox -e pytest
- name: Upload Coverage to Codecov
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
21 changes: 17 additions & 4 deletions girder_wholetale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import jsonschema
import six
import validators
from girder import events
from girder import auditLogger, events
from girder.api import access
from girder.api.describe import Description, autoDescribeRoute, describeRoute
from girder.api.rest import RestException, boundHandler, loadmodel
Expand Down Expand Up @@ -52,9 +52,9 @@
from .lib.path_mapper import PathMapper
from .models.instance import Instance as InstanceModel
from .models.lock import Lock as LockModel
from .models.version_hierarchy import VersionHierarchyModel
from .models.session import Session as SessionModel
from .models.transfer import Transfer as TransferModel
from .models.version_hierarchy import VersionHierarchyModel
from .rest.account import Account
from .rest.dataset import Dataset
from .rest.dm import DM
Expand All @@ -77,6 +77,7 @@
external_auth_providers_schema,
repository_to_provider_schema,
)
from .utils import add_influx_handler

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -231,6 +232,10 @@ def _validateLogo(doc):
PluginSettings.CONTACT_HREF,
PluginSettings.BUG_HREF,
PluginSettings.MOUNTS,
PluginSettings.INFLUXDB_URL,
PluginSettings.INFLUXDB_TOKEN,
PluginSettings.INFLUXDB_ORG,
PluginSettings.INFLUXDB_BUCKET,
}
)
def validateHref(doc):
Expand Down Expand Up @@ -807,6 +812,7 @@ def load(self, info):

pathMapper = PathMapper()
from .lib.transfer_manager import SimpleTransferManager

transferManager = SimpleTransferManager(pathMapper)

# a GC that does nothing
Expand All @@ -825,7 +831,10 @@ def load(self, info):
LRUSortingScheme(),
),
)
from .lib.cache_manager import SimpleCacheManager # Needs models to be registered
from .lib.cache_manager import (
SimpleCacheManager,
)

cacheManager = SimpleCacheManager(
Setting(), transferManager, fileGC, pathMapper
)
Expand Down Expand Up @@ -964,7 +973,11 @@ def sessionDeleted(event):
metricsLogger.setLevel(logging.INFO)
metricsLogger.addHandler(_MetricsHandler())
VersionHierarchyModel().resetCrashedCriticalSections()

if Setting().get(PluginSettings.INFLUXDB_BUCKET):
add_influx_handler(
auditLogger,
Setting().get(PluginSettings.INFLUXDB_BUCKET),
)
registerPluginStaticContent(
plugin="wholetale",
css=["/style.css"],
Expand Down
8 changes: 8 additions & 0 deletions girder_wholetale/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class PluginSettings:
GC_RUN_INTERVAL = "dm.gc_run_interval"
GC_COLLECT_START_FRACTION = "dm.gc_collect_start_fraction"
GC_COLLECT_END_FRACTION = "dm.gc_collect_end_fraction"
INFLUXDB_URL = "wholetale.influxdb_url"
INFLUXDB_TOKEN = "wholetale.influxdb_token"
INFLUXDB_ORG = "wholetale.influxdb_org"
INFLUXDB_BUCKET = "wholetale.influxdb_bucket"


SettingDefault.defaults.update(
Expand Down Expand Up @@ -165,6 +169,10 @@ class PluginSettings:
PluginSettings.RUNS_DIRS_ROOT: "/tmp/wt/runs-dirs",
PluginSettings.VERSIONS_DIRS_ROOT: "/tmp/wt/versions-dirs",
PluginSettings.DAV_SERVER: False,
PluginSettings.INFLUXDB_URL: "http://images.local.xarthisius.xyz:8086",
PluginSettings.INFLUXDB_TOKEN: "",
PluginSettings.INFLUXDB_ORG: "my_org",
PluginSettings.INFLUXDB_BUCKET: "",
}
)

Expand Down
98 changes: 98 additions & 0 deletions girder_wholetale/tests/influxlog_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import io

import cherrypy
import mock
import pytest
from girder import auditLogger
from girder.models.folder import Folder
from girder.models.setting import Setting
from girder.models.upload import Upload
from pytest_girder.assertions import assertStatusOk

from girder_wholetale.constants import PluginSettings
from girder_wholetale.utils import add_influx_handler


@pytest.fixture
def influxdb_client():
return mock.Mock()


@pytest.fixture
def enable_influxlog_setting(influxdb_client):
Setting().set(PluginSettings.INFLUXDB_BUCKET, "mock_bucket")
Setting().set(PluginSettings.INFLUXDB_TOKEN, "mock_token")

yield

Setting().set(PluginSettings.INFLUXDB_BUCKET, "")
Setting().set(PluginSettings.INFLUXDB_TOKEN, "")


@pytest.mark.plugin("wholetale")
def test_influxdb_logger(server, admin, enable_influxlog_setting, fsAssetstore):
with mock.patch(
"girder_wholetale.utils.influxdb.InfluxDBClient"
) as influxdb_client:
add_influx_handler(auditLogger, Setting().get(PluginSettings.INFLUXDB_BUCKET))
influx_logger = auditLogger.handlers[0]
assert influx_logger.bucket == "mock_bucket"
influxdb_client.assert_called_once_with(
url=Setting().get(PluginSettings.INFLUXDB_URL),
token="mock_token",
org=Setting().get(PluginSettings.INFLUXDB_ORG),
)

resp = server.request(path="/user/me", method="GET", user=admin)
assertStatusOk(resp)

write = influx_logger.write_api.write
assert write.call_count == 1
assert write.call_args_list[0].kwargs["record"]._fields == {
"method": "GET",
"status": 200,
"route": "user/me",
"params": "{}",
"message": "rest.request",
}

folder = Folder().createFolder(
admin, "folder", parentType="user", public=True, creator=admin
)
assert write.call_count == 2
assert write.call_args_list[1].kwargs["record"]._fields == {
"collection": "folder",
"id": str(folder["_id"]),
}

file = Upload().uploadFromFile(
io.BytesIO(b"blah blah"),
9,
"test.txt",
parentType="folder",
parent=folder,
user=admin,
mimeType="text/plain",
)

resp = server.request(
path="/file/%s/download" % file["_id"],
method="GET",
user=admin,
isJson=False,
)
assertStatusOk(resp)
assert resp.collapse_body() == b"blah blah"
assert write.call_count == 7
assert write.call_args_list[-2].kwargs["record"]._fields == {
"fileId": str(file["_id"]),
"startBytes": None,
"endBytes": None,
"extraParameters": None,
}
assert influx_logger.write_api.close.call_count == 0
assert influxdb_client.return_value.close.call_count == 0
cherrypy.engine.restart()
assert influx_logger.write_api.close.call_count == 1
assert influxdb_client.return_value.close.call_count == 1
assert auditLogger.handlers == []
100 changes: 95 additions & 5 deletions girder_wholetale/utils.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,108 @@
import os
import datetime
import json
import logging
import os
import pathlib
import six.moves.urllib as urllib
from urllib.parse import quote_plus

from girder.utility.model_importer import ModelImporter
try:
import influxdb_client as influxdb
except ImportError:
influxdb = None

import cherrypy
from girder.models.notification import Notification
from girder.models.user import User
from girder.models.setting import Setting
from girder.models.user import User
from girder.utility.model_importer import ModelImporter

from .constants import PluginSettings

NOTIFICATION_EXP_HOURS = 1
WT_EVENT_EXP_SECONDS = int(os.environ.get("GIRDER_WT_EVENT_EXP_SECONDS", 5))


class InfluxHandler(logging.Handler):
def __init__(self, bucket):
if influxdb is None:
raise ImportError("InfluxDB client not installed")

super().__init__()
self.client = influxdb.InfluxDBClient(
url=Setting().get(PluginSettings.INFLUXDB_URL),
token=Setting().get(PluginSettings.INFLUXDB_TOKEN),
org=Setting().get(PluginSettings.INFLUXDB_ORG),
)
self.write_api = self.client.write_api()
self.bucket = bucket

@staticmethod
def _document_create(record):
return (
influxdb.Point("document")
.tag("level", record.levelname)
.field("collection", record.details.get("collection"))
.field("id", str(record.details.get("id")))
.time(int(record.created * 1e9), influxdb.WritePrecision.NS)
)

@staticmethod
def _download(record):
return (
influxdb.Point("download")
.tag("level", record.levelname)
.field("fileId", str(record.details.get("fileId")))
.field("startBytes", record.details.get("startBytes"))
.field("endBytes", record.details.get("endBytes"))
.field("extraParameters", record.details.get("extraParameters"))
.time(int(record.created * 1e9), influxdb.WritePrecision.NS)
)

@staticmethod
def _rest_request(record):
return (
influxdb.Point("rest_request")
.tag("level", record.levelname)
.field("method", record.details.get("method"))
.field("status", record.details.get("status"))
.field("route", "/".join(record.details.get("route", "")))
.field("params", json.dumps(record.details.get("params")))
.field("message", record.getMessage())
.time(int(record.created * 1e9), influxdb.WritePrecision.NS)
)

def emit(self, record):
match record.getMessage():
case "document.create":
point = self._document_create(record)
case "file.download":
point = self._download(record)
case "rest.request":
point = self._rest_request(record)
self.write_api.write(bucket=self.bucket, record=point)

def close(self):
self.write_api._write_options.write_scheduler.executor.shutdown(wait=False)
self.write_api.close()
self.client.close()
super().close()


def add_influx_handler(logger, bucket):
def stop_influx_client():
for handler in logger.handlers:
if isinstance(handler, InfluxHandler):
logger.removeHandler(handler)
handler.close()

stop_influx_client() # Remove any existing handlers
logger.addHandler(InfluxHandler(bucket))
stop_influx_client.priority = 10
cherrypy.engine.subscribe(
"stop", stop_influx_client
) # ensure we clean up on restart


def get_tale_dir_root(tale: dict, root_path_setting: str) -> pathlib.Path:
root = Setting().get(root_path_setting)
return pathlib.Path(root) / str(tale["_id"])[0:2] / str(tale["_id"])
Expand Down Expand Up @@ -41,7 +131,7 @@ def esc(value):
:return: The escaped string
:rtype: str
"""
return urllib.parse.quote_plus(value)
return quote_plus(value)


def notify_event(users, event, affectedIds):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ httpio>=0.3.0
fs
pathvalidate
bdbag
influxdb-client
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"rdflib",
"celery[redis]",
"pathvalidate",
"influxdb-client",
"python-magic",
"requests",
"validators",
Expand Down

0 comments on commit db2acb7

Please sign in to comment.