Skip to content

Commit

Permalink
Pylint + Autopep8 fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
leechwort committed Dec 6, 2023
1 parent 98d173c commit 761b657
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 78 deletions.
8 changes: 4 additions & 4 deletions app/api/device_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from flask import Flask, Blueprint, jsonify, request
from flask import current_app as app

from models.access_log import AccessLog
from models.user import User
from models.device import Device
from flask import current_app as app

import json


Expand All @@ -20,7 +20,6 @@ def accesses(device_id):
@device_api.route("/devices/<device_id>/log_operation", methods=["POST"])
def log_operation(device_id):

#json_data = json.loads(request.get_json())
json_data = request.get_json()
if not json_data:
raise Exception("Invalid request, no JSON data received")
Expand All @@ -36,5 +35,6 @@ def log_operation(device_id):
raise Exception("Invalid operation")

AccessLog.add(device_id, user_key, operation)
app.ee.emit('access-log-entry-added', {"device_id": device_id, "user_key": user_key, "operation": operation})
app.ee.emit('access-log-entry-added', {"device_id": device_id,
"user_key": user_key, "operation": operation})
return "OK", 201
31 changes: 15 additions & 16 deletions app/api/web_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# pylint: disable=consider-using-f-string
# pylint: disable=broad-except
import json

from flask import Blueprint, jsonify, request
Expand All @@ -8,9 +10,8 @@

web_api = Blueprint("web_api", __name__)

"""
Logs API
"""

# Logs API


@web_api.route("/api/logs", methods=["GET"])
Expand All @@ -24,9 +25,7 @@ def api_get_logs():
return jsonify(AccessLog.get_full_log(start_time, end_time, limit, offset))


"""
Devices API
"""
# Devices API


@web_api.route("/api/devices/latest_key", methods=["GET"])
Expand All @@ -44,7 +43,8 @@ def api_add_device():
device_data = request.get_json()

try:
device = Device(device_data["device_id"], device_data.get("device_name"), device_data["device_type"])
device = Device(device_data["device_id"], device_data.get(
"device_name"), device_data["device_type"])
device.save()
app.logger.info("Device %s added successfully" % device_data["device_id"])
return jsonify({"message": "Device added successfully"}), 201
Expand Down Expand Up @@ -75,28 +75,29 @@ def api_remove_device(device_id):
return jsonify({"message": "Error removing device"}), 303


"""
Users API
"""

# Users API

@web_api.route("/api/users", methods=["GET"])
def api_get_user_permissions():
return jsonify(User.get_permissions())


@web_api.route("/api/users", methods=["POST"])
def api_add_user():
user_data = request.get_json()

user = User(user_data["name"], user_data["key"], user_data.get("slack_id"))

number_of_new_user_added = user.save()

# pylint: disable=no-else-return
if number_of_new_user_added == 1:
return jsonify({"message": "User added successfully"}), 201
if number_of_new_user_added == 0:
elif number_of_new_user_added == 0:
return jsonify({"message": "User already exists"}), 303

# Handle other cases (e.g., database error)
return jsonify({"message": "An error occurred"}), 500


@web_api.route("/api/users/<user_key>", methods=["DELETE"])
def api_delete_user(user_key):
Expand All @@ -122,9 +123,7 @@ def api_remove_user_permission(user_key, device_id):
return jsonify({"message": "User permission removed successfully"})


"""
Settings API
"""
# Settings API


@web_api.route("/api/settings", methods=["GET"])
Expand Down
1 change: 1 addition & 0 deletions app/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

@login_manager.user_loader
def load_user(user_id):
# pylint: disable=unused-argument
return AdminUser("", "")


Expand Down
44 changes: 22 additions & 22 deletions app/config_debug.json
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
{
"DEBUG":false,
"TESTING":false,
"SECRET_KEY":"secret_key",
"USE_X_SENDFILE":false,
"APPLICATION_ROOT":"/",
"SESSION_COOKIE_NAME":"session",
"SESSION_COOKIE_HTTPONLY":true,
"SESSION_COOKIE_SECURE":false,
"SESSION_REFRESH_EACH_REQUEST":true,
"TRAP_HTTP_EXCEPTIONS":false,
"EXPLAIN_TEMPLATE_LOADING":false,
"PREFERRED_URL_SCHEME":"http",
"MAX_COOKIE_SIZE":4093,
"DATABASE_URI":"file:database.db",
"PRISMO":{
"CURRENT_CONFIG_FILE": "config_debug.json",
"NOTIFIER":{
"CURRENT_NOTIFIER":"SLACK",
"SLACK_TOKEN":"xoxb-NOT-a-real-token",
"SLACK_CHANNEL":"#prismo-debug"
}
}
"DEBUG": false,
"TESTING": false,
"SECRET_KEY": "secret_key",
"USE_X_SENDFILE": false,
"APPLICATION_ROOT": "/",
"SESSION_COOKIE_NAME": "session",
"SESSION_COOKIE_HTTPONLY": true,
"SESSION_COOKIE_SECURE": false,
"SESSION_REFRESH_EACH_REQUEST": true,
"TRAP_HTTP_EXCEPTIONS": false,
"EXPLAIN_TEMPLATE_LOADING": false,
"PREFERRED_URL_SCHEME": "http",
"MAX_COOKIE_SIZE": 4093,
"DATABASE_URI": "file:database.db",
"PRISMO": {
"CURRENT_CONFIG_FILE": "config_debug.json",
"NOTIFIER": {
"CURRENT_NOTIFIER": "SLACK",
"SLACK_CHANNEL": "#prismo-debugxx",
"SLACK_TOKEN": "xoxb-10631168368-6285659487555-mlu3mtOsfnSI74TNS4JOjBvg"
}
}
}
15 changes: 9 additions & 6 deletions app/models/access_log.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sqlite3
from flask import current_app as app


class AccessLog:
def __init__(self, device_id, user_key, operation_type, operation_time):
self.device_id = device_id
Expand All @@ -17,16 +18,18 @@ def add(cls, device_id, user_key, operation):
"""
connection = sqlite3.connect(app.config["DATABASE_URI"])
cursor = connection.cursor()
cursor.execute("INSERT INTO event_logs(device_id, user_key, operation_type) VALUES (?, ?, ?)",
(device_id, user_key, operation),
)
cursor.execute("INSERT INTO event_logs(device_id, user_key, operation_type)"
"VALUES (?, ?, ?)",
(device_id, user_key, operation),
)
connection.commit()
connection.close()

@classmethod
def get_full_log(cls, start_time=None, end_time=None, limit=100, offset=0):
"""
Retrieve event logs from a SQLite database within a specified time range and limit the number
of results.
Retrieve event logs from a SQLite database within a specified time range and limit the
number of results.
Args:
start_time (str, optional): The start time of the time range to filter the logs. Should
Expand All @@ -50,7 +53,7 @@ def get_full_log(cls, start_time=None, end_time=None, limit=100, offset=0):
Example:
Retrieve logs for a specific time range and limit the results
logs = query_event_logs(start_time='2023-01-01 00:00:00', end_time='2023-01-31 23:59:59',
logs = query_event_logs(start_time='2023-01-01 00:00:00',end_time='2023-01-31 23:59:59',
limit=50)
"""
connection = sqlite3.connect(app.config["DATABASE_URI"])
Expand Down
7 changes: 5 additions & 2 deletions app/models/admin_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from flask_login import UserMixin
from flask import current_app as app


class AdminUser(UserMixin):
def __init__(self, username, password=None):
self.username = username
Expand All @@ -19,10 +20,11 @@ def check_password(self, password):
result = cursor.fetchone()
connection.close()
if not result:
app.logger.warning(f"Username not found")
app.logger.warning("Username not found")
return False
try:
app.logger.info(f"Verify hash result:", argon2.PasswordHasher().verify(result[0], password))
app.logger.info("Verify hash result:",
argon2.PasswordHasher().verify(result[0], password))
return True
except argon2.exceptions.VerifyMismatchError:
app.logger.warning("Wrong password")
Expand All @@ -36,5 +38,6 @@ def create_user(self):
connection.commit()
connection.close()

# pylint: disable=invalid-overridden-method
def is_authenticated(self):
return True
17 changes: 10 additions & 7 deletions app/models/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ def get_all_devices(cls):
def get_authorized_users(cls, device_id):
"""
Retrieves the list of authorized users for a specific device.
Args:
device_id (int): The ID of the device to check permissions for.
Returns:
list: A list of user keys authorized to access the specified device.
"""
Expand All @@ -56,7 +56,8 @@ def get_authorized_users(cls, device_id):

# Fetch authorized users for the given device
cursor.execute(
"SELECT users.key FROM users INNER JOIN permissions ON users.key = permissions.user_key WHERE permissions.device_id = ?",
"SELECT users.key FROM users INNER JOIN permissions"
"ON users.key = permissions.user_key WHERE permissions.device_id = ?",
(device_id,),
)
authorized_users = [row[0] for row in cursor.fetchall()]
Expand Down Expand Up @@ -101,7 +102,8 @@ def update_device(self, new_device_type, new_name):
cursor = connection.cursor()

if new_device_type is not None:
cursor.execute("UPDATE devices SET type = ? WHERE id = ?", (new_device_type, self.device_id))
cursor.execute("UPDATE devices SET type = ? WHERE id = ?",
(new_device_type, self.device_id))

if new_name is not None:
cursor.execute("UPDATE devices SET name = ? WHERE id = ?", (new_name, self.device_id))
Expand Down Expand Up @@ -134,8 +136,8 @@ def get_by_id(cls, device_id):
result = cursor.fetchone()
if result:
return Device(result[0], result[1], result[2], result[3])
else:
return None

return None

@classmethod
def get_latest_key(cls):
Expand All @@ -144,7 +146,8 @@ def get_latest_key(cls):
"""
connection = sqlite3.connect(app.config["DATABASE_URI"])
connection.row_factory = sqlite3.Row
# This query returns extended info about latest key event (user_key, operation_time, device_name, user_name)
# This query returns extended info about latest key event
# (user_key, operation_time, device_name, user_name)
query = """
SELECT el.user_key, el.operation_time, d.name AS device_name
FROM event_logs el
Expand Down
22 changes: 9 additions & 13 deletions app/models/user.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sqlite3
from flask import current_app as app


class User:
def __init__(self, name, key, slack_id=None):
self.name = name
Expand All @@ -15,8 +16,8 @@ def get_by_key(cls, key):
result = cursor.fetchone()
if result:
return User(result[0], result[1], result[2])
else:
return None

return None

def save(self):
"""
Expand Down Expand Up @@ -89,9 +90,9 @@ def get_permissions(cls, user_key=None):
# Get device permissions for the current user
device_permissions = []
cursor.execute("SELECT devices.id, devices.name FROM devices")
for row in cursor.fetchall():
device_id = row[0]
device_name = row[1]
for result in cursor.fetchall():
device_id = result[0]
device_name = result[1]

# Check if device-user pair exists in permissions table
cursor.execute(
Expand All @@ -101,10 +102,7 @@ def get_permissions(cls, user_key=None):
exists_result = cursor.fetchone()

# Set allowed flag based on existence in permissions table
if exists_result:
allowed = True # Explicit permission found
else:
allowed = False # No explicit permission found
allowed = bool(exists_result is not None) # Default to False

# Append device information to the permissions list
device_permissions.append(
Expand Down Expand Up @@ -138,10 +136,8 @@ def has_permission_for_device(self, device_id):
(self.key, device_id),
)
result = cursor.fetchone()
if result:
return True
else:
return False

return bool(result is not None)

def add_permission(self, device_id):
# Connect to the database
Expand Down
Loading

0 comments on commit 761b657

Please sign in to comment.