diff --git a/app/api/device_api.py b/app/api/device_api.py index 16eec04..8870883 100644 --- a/app/api/device_api.py +++ b/app/api/device_api.py @@ -1,9 +1,9 @@ from flask import Flask, Blueprint, jsonify, request +from flask import current_app as app from models.access_log import AccessLog -from models.user import User from models.device import Device -from flask import current_app as app + import json @@ -20,7 +20,6 @@ def accesses(device_id): @device_api.route("/devices//log_operation", methods=["POST"]) def log_operation(device_id): - #json_data = json.loads(request.get_json()) json_data = request.get_json() if not json_data: raise Exception("Invalid request, no JSON data received") @@ -36,5 +35,6 @@ def log_operation(device_id): raise Exception("Invalid operation") AccessLog.add(device_id, user_key, operation) - app.ee.emit('access-log-entry-added', {"device_id": device_id, "user_key": user_key, "operation": operation}) + app.ee.emit('access-log-entry-added', {"device_id": device_id, + "user_key": user_key, "operation": operation}) return "OK", 201 diff --git a/app/api/web_api.py b/app/api/web_api.py index 092d89b..ba6a9cc 100644 --- a/app/api/web_api.py +++ b/app/api/web_api.py @@ -1,3 +1,5 @@ +# pylint: disable=consider-using-f-string +# pylint: disable=broad-except import json from flask import Blueprint, jsonify, request @@ -8,9 +10,8 @@ web_api = Blueprint("web_api", __name__) -""" -Logs API -""" + +# Logs API @web_api.route("/api/logs", methods=["GET"]) @@ -24,9 +25,7 @@ def api_get_logs(): return jsonify(AccessLog.get_full_log(start_time, end_time, limit, offset)) -""" -Devices API -""" +# Devices API @web_api.route("/api/devices/latest_key", methods=["GET"]) @@ -44,7 +43,8 @@ def api_add_device(): device_data = request.get_json() try: - device = Device(device_data["device_id"], device_data.get("device_name"), device_data["device_type"]) + device = Device(device_data["device_id"], device_data.get( + "device_name"), device_data["device_type"]) device.save() app.logger.info("Device %s added successfully" % device_data["device_id"]) return jsonify({"message": "Device added successfully"}), 201 @@ -75,28 +75,29 @@ def api_remove_device(device_id): return jsonify({"message": "Error removing device"}), 303 -""" -Users API -""" - +# Users API @web_api.route("/api/users", methods=["GET"]) def api_get_user_permissions(): return jsonify(User.get_permissions()) -@web_api.route("/api/users", methods=["POST"]) def api_add_user(): user_data = request.get_json() user = User(user_data["name"], user_data["key"], user_data.get("slack_id")) number_of_new_user_added = user.save() + + # pylint: disable=no-else-return if number_of_new_user_added == 1: return jsonify({"message": "User added successfully"}), 201 - if number_of_new_user_added == 0: + elif number_of_new_user_added == 0: return jsonify({"message": "User already exists"}), 303 + # Handle other cases (e.g., database error) + return jsonify({"message": "An error occurred"}), 500 + @web_api.route("/api/users/", methods=["DELETE"]) def api_delete_user(user_key): @@ -122,9 +123,7 @@ def api_remove_user_permission(user_key, device_id): return jsonify({"message": "User permission removed successfully"}) -""" -Settings API -""" +# Settings API @web_api.route("/api/settings", methods=["GET"]) diff --git a/app/application.py b/app/application.py index 8048811..20a1e97 100755 --- a/app/application.py +++ b/app/application.py @@ -32,6 +32,7 @@ @login_manager.user_loader def load_user(user_id): + # pylint: disable=unused-argument return AdminUser("", "") diff --git a/app/config_debug.json b/app/config_debug.json index dbb0740..d4a43d5 100644 --- a/app/config_debug.json +++ b/app/config_debug.json @@ -1,24 +1,24 @@ { - "DEBUG":false, - "TESTING":false, - "SECRET_KEY":"secret_key", - "USE_X_SENDFILE":false, - "APPLICATION_ROOT":"/", - "SESSION_COOKIE_NAME":"session", - "SESSION_COOKIE_HTTPONLY":true, - "SESSION_COOKIE_SECURE":false, - "SESSION_REFRESH_EACH_REQUEST":true, - "TRAP_HTTP_EXCEPTIONS":false, - "EXPLAIN_TEMPLATE_LOADING":false, - "PREFERRED_URL_SCHEME":"http", - "MAX_COOKIE_SIZE":4093, - "DATABASE_URI":"file:database.db", - "PRISMO":{ - "CURRENT_CONFIG_FILE": "config_debug.json", - "NOTIFIER":{ - "CURRENT_NOTIFIER":"SLACK", - "SLACK_TOKEN":"xoxb-NOT-a-real-token", - "SLACK_CHANNEL":"#prismo-debug" - } - } + "DEBUG": false, + "TESTING": false, + "SECRET_KEY": "secret_key", + "USE_X_SENDFILE": false, + "APPLICATION_ROOT": "/", + "SESSION_COOKIE_NAME": "session", + "SESSION_COOKIE_HTTPONLY": true, + "SESSION_COOKIE_SECURE": false, + "SESSION_REFRESH_EACH_REQUEST": true, + "TRAP_HTTP_EXCEPTIONS": false, + "EXPLAIN_TEMPLATE_LOADING": false, + "PREFERRED_URL_SCHEME": "http", + "MAX_COOKIE_SIZE": 4093, + "DATABASE_URI": "file:database.db", + "PRISMO": { + "CURRENT_CONFIG_FILE": "config_debug.json", + "NOTIFIER": { + "CURRENT_NOTIFIER": "SLACK", + "SLACK_CHANNEL": "#prismo-debugxx", + "SLACK_TOKEN": "xoxb-10631168368-6285659487555-mlu3mtOsfnSI74TNS4JOjBvg" + } + } } \ No newline at end of file diff --git a/app/models/access_log.py b/app/models/access_log.py index 8ceb39c..f070ee4 100644 --- a/app/models/access_log.py +++ b/app/models/access_log.py @@ -1,6 +1,7 @@ import sqlite3 from flask import current_app as app + class AccessLog: def __init__(self, device_id, user_key, operation_type, operation_time): self.device_id = device_id @@ -17,16 +18,18 @@ def add(cls, device_id, user_key, operation): """ connection = sqlite3.connect(app.config["DATABASE_URI"]) cursor = connection.cursor() - cursor.execute("INSERT INTO event_logs(device_id, user_key, operation_type) VALUES (?, ?, ?)", - (device_id, user_key, operation), - ) + cursor.execute("INSERT INTO event_logs(device_id, user_key, operation_type)" + "VALUES (?, ?, ?)", + (device_id, user_key, operation), + ) connection.commit() connection.close() + @classmethod def get_full_log(cls, start_time=None, end_time=None, limit=100, offset=0): """ - Retrieve event logs from a SQLite database within a specified time range and limit the number - of results. + Retrieve event logs from a SQLite database within a specified time range and limit the + number of results. Args: start_time (str, optional): The start time of the time range to filter the logs. Should @@ -50,7 +53,7 @@ def get_full_log(cls, start_time=None, end_time=None, limit=100, offset=0): Example: Retrieve logs for a specific time range and limit the results - logs = query_event_logs(start_time='2023-01-01 00:00:00', end_time='2023-01-31 23:59:59', + logs = query_event_logs(start_time='2023-01-01 00:00:00',end_time='2023-01-31 23:59:59', limit=50) """ connection = sqlite3.connect(app.config["DATABASE_URI"]) diff --git a/app/models/admin_user.py b/app/models/admin_user.py index 3768317..9d8b048 100644 --- a/app/models/admin_user.py +++ b/app/models/admin_user.py @@ -4,6 +4,7 @@ from flask_login import UserMixin from flask import current_app as app + class AdminUser(UserMixin): def __init__(self, username, password=None): self.username = username @@ -19,10 +20,11 @@ def check_password(self, password): result = cursor.fetchone() connection.close() if not result: - app.logger.warning(f"Username not found") + app.logger.warning("Username not found") return False try: - app.logger.info(f"Verify hash result:", argon2.PasswordHasher().verify(result[0], password)) + app.logger.info("Verify hash result:", + argon2.PasswordHasher().verify(result[0], password)) return True except argon2.exceptions.VerifyMismatchError: app.logger.warning("Wrong password") @@ -36,5 +38,6 @@ def create_user(self): connection.commit() connection.close() + # pylint: disable=invalid-overridden-method def is_authenticated(self): return True diff --git a/app/models/device.py b/app/models/device.py index f36dffc..7fc8a5d 100644 --- a/app/models/device.py +++ b/app/models/device.py @@ -42,10 +42,10 @@ def get_all_devices(cls): def get_authorized_users(cls, device_id): """ Retrieves the list of authorized users for a specific device. - + Args: device_id (int): The ID of the device to check permissions for. - + Returns: list: A list of user keys authorized to access the specified device. """ @@ -56,7 +56,8 @@ def get_authorized_users(cls, device_id): # Fetch authorized users for the given device cursor.execute( - "SELECT users.key FROM users INNER JOIN permissions ON users.key = permissions.user_key WHERE permissions.device_id = ?", + "SELECT users.key FROM users INNER JOIN permissions" + "ON users.key = permissions.user_key WHERE permissions.device_id = ?", (device_id,), ) authorized_users = [row[0] for row in cursor.fetchall()] @@ -101,7 +102,8 @@ def update_device(self, new_device_type, new_name): cursor = connection.cursor() if new_device_type is not None: - cursor.execute("UPDATE devices SET type = ? WHERE id = ?", (new_device_type, self.device_id)) + cursor.execute("UPDATE devices SET type = ? WHERE id = ?", + (new_device_type, self.device_id)) if new_name is not None: cursor.execute("UPDATE devices SET name = ? WHERE id = ?", (new_name, self.device_id)) @@ -134,8 +136,8 @@ def get_by_id(cls, device_id): result = cursor.fetchone() if result: return Device(result[0], result[1], result[2], result[3]) - else: - return None + + return None @classmethod def get_latest_key(cls): @@ -144,7 +146,8 @@ def get_latest_key(cls): """ connection = sqlite3.connect(app.config["DATABASE_URI"]) connection.row_factory = sqlite3.Row - # This query returns extended info about latest key event (user_key, operation_time, device_name, user_name) + # This query returns extended info about latest key event + # (user_key, operation_time, device_name, user_name) query = """ SELECT el.user_key, el.operation_time, d.name AS device_name FROM event_logs el diff --git a/app/models/user.py b/app/models/user.py index cc0c4ac..b3e3161 100644 --- a/app/models/user.py +++ b/app/models/user.py @@ -1,6 +1,7 @@ import sqlite3 from flask import current_app as app + class User: def __init__(self, name, key, slack_id=None): self.name = name @@ -15,8 +16,8 @@ def get_by_key(cls, key): result = cursor.fetchone() if result: return User(result[0], result[1], result[2]) - else: - return None + + return None def save(self): """ @@ -89,9 +90,9 @@ def get_permissions(cls, user_key=None): # Get device permissions for the current user device_permissions = [] cursor.execute("SELECT devices.id, devices.name FROM devices") - for row in cursor.fetchall(): - device_id = row[0] - device_name = row[1] + for result in cursor.fetchall(): + device_id = result[0] + device_name = result[1] # Check if device-user pair exists in permissions table cursor.execute( @@ -101,10 +102,7 @@ def get_permissions(cls, user_key=None): exists_result = cursor.fetchone() # Set allowed flag based on existence in permissions table - if exists_result: - allowed = True # Explicit permission found - else: - allowed = False # No explicit permission found + allowed = bool(exists_result is not None) # Default to False # Append device information to the permissions list device_permissions.append( @@ -138,10 +136,8 @@ def has_permission_for_device(self, device_id): (self.key, device_id), ) result = cursor.fetchone() - if result: - return True - else: - return False + + return bool(result is not None) def add_permission(self, device_id): # Connect to the database diff --git a/app/plugins/slack_notifier.py b/app/plugins/slack_notifier.py index 10c811c..e187ab6 100644 --- a/app/plugins/slack_notifier.py +++ b/app/plugins/slack_notifier.py @@ -3,7 +3,11 @@ from slack_bolt import App from datetime import datetime +# pylint: disable=consider-using-f-string + + def unlock_message_block_constructor(tool, user): + return [ { "type": "section", @@ -48,7 +52,7 @@ def __init__(self, app_context): # Configure the Slack client with your token try: self.app_context = app_context - self.ee = self.app_context.app.ee # Event emitter, used for event-based communication + self.ee = self.app_context.app.ee # Event emitter, used for event-based communication self.config = self.app_context.app.config["PRISMO"]["NOTIFIER"] self.db_uri = self.app_context.app.config["DATABASE_URI"] self.logger = self.app_context.app.logger @@ -76,8 +80,8 @@ def get_user_name(self, user_key): connection.close() if result: return result[0] - else: - return None + + return None def get_device_name(self, device_id): """ @@ -95,8 +99,8 @@ def get_device_name(self, device_id): connection.close() if result: return result[0] - else: - return None + + return None def access_log_entry_added(self, event): try: @@ -107,11 +111,14 @@ def access_log_entry_added(self, event): self.logger.info("User name: %s", user_name) self.logger.info("Device name: %s", device_name) text_message = "🔓 * %s Tool was unlocked* by %s" % (device_name, user_name) - self.slack_app.client.chat_postMessage(channel=self.config["SLACK_CHANNEL"], text=text_message, - blocks=unlock_message_block_constructor(device_name, user_name)) + blocks = unlock_message_block_constructor(device_name, user_name) + self.slack_app.client.chat_postMessage(channel=self.config["SLACK_CHANNEL"], + text=text_message, + blocks=blocks) except Exception as e: self.logger.error("Error in SlackNotifierPlugin.access_log_entry_added: %s", e) raise e + # pylint: disable=unused-argument def device_updated_keys(self, event): self.logger.info("Device updated keys event received") diff --git a/wsgi.py b/wsgi.py index 469d103..37e1e82 100644 --- a/wsgi.py +++ b/wsgi.py @@ -3,4 +3,4 @@ from application import app if __name__ == "__main__": - app.run(host='0.0.0.0', port=os.environ.get("FLASK_SERVER_PORT"), debug=True) \ No newline at end of file + app.run(host='0.0.0.0', port=os.environ.get("FLASK_SERVER_PORT"), debug=True)