-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsqlite_explorer.py
164 lines (129 loc) · 5.26 KB
/
sqlite_explorer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from pathlib import Path
import sqlite3
import os
from typing import List, Dict, Any, Optional
from fastmcp import FastMCP
# Initialize FastMCP server
mcp = FastMCP("SQLite Explorer",
log_level="CRITICAL")
# Path to Messages database - must be provided via SQLITE_DB_PATH environment variable
if 'SQLITE_DB_PATH' not in os.environ:
raise ValueError("SQLITE_DB_PATH environment variable must be set")
DB_PATH = Path(os.environ['SQLITE_DB_PATH'])
class SQLiteConnection:
def __init__(self, db_path: Path):
self.db_path = db_path
self.conn = None
def __enter__(self):
self.conn = sqlite3.connect(str(self.db_path))
self.conn.row_factory = sqlite3.Row
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
self.conn.close()
@mcp.tool()
def read_query(
query: str,
params: Optional[List[Any]] = None,
fetch_all: bool = True,
row_limit: int = 1000
) -> List[Dict[str, Any]]:
"""Execute a query on the Messages database.
Args:
query: SELECT SQL query to execute
params: Optional list of parameters for the query
fetch_all: If True, fetches all results. If False, fetches one row.
row_limit: Maximum number of rows to return (default 1000)
Returns:
List of dictionaries containing the query results
"""
if not DB_PATH.exists():
raise FileNotFoundError(f"Messages database not found at: {DB_PATH}")
# Clean and validate the query
query = query.strip()
# Remove trailing semicolon if present
if query.endswith(';'):
query = query[:-1].strip()
# Check for multiple statements by looking for semicolons not inside quotes
def contains_multiple_statements(sql: str) -> bool:
in_single_quote = False
in_double_quote = False
for char in sql:
if char == "'" and not in_double_quote:
in_single_quote = not in_single_quote
elif char == '"' and not in_single_quote:
in_double_quote = not in_double_quote
elif char == ';' and not in_single_quote and not in_double_quote:
return True
return False
if contains_multiple_statements(query):
raise ValueError("Multiple SQL statements are not allowed")
# Validate query type (allowing common CTEs)
query_lower = query.lower()
if not any(query_lower.startswith(prefix) for prefix in ('select', 'with')):
raise ValueError("Only SELECT queries (including WITH clauses) are allowed for safety")
params = params or []
with SQLiteConnection(DB_PATH) as conn:
cursor = conn.cursor()
try:
# Only add LIMIT if query doesn't already have one
if 'limit' not in query_lower:
query = f"{query} LIMIT {row_limit}"
cursor.execute(query, params)
if fetch_all:
results = cursor.fetchall()
else:
results = [cursor.fetchone()]
return [dict(row) for row in results if row is not None]
except sqlite3.Error as e:
raise ValueError(f"SQLite error: {str(e)}")
@mcp.tool()
def list_tables() -> List[str]:
"""List all tables in the Messages database.
Returns:
List of table names in the database
"""
if not DB_PATH.exists():
raise FileNotFoundError(f"Messages database not found at: {DB_PATH}")
with SQLiteConnection(DB_PATH) as conn:
cursor = conn.cursor()
try:
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table'
ORDER BY name
""")
return [row['name'] for row in cursor.fetchall()]
except sqlite3.Error as e:
raise ValueError(f"SQLite error: {str(e)}")
@mcp.tool()
def describe_table(table_name: str) -> List[Dict[str, str]]:
"""Get detailed information about a table's schema.
Args:
table_name: Name of the table to describe
Returns:
List of dictionaries containing column information:
- name: Column name
- type: Column data type
- notnull: Whether the column can contain NULL values
- dflt_value: Default value for the column
- pk: Whether the column is part of the primary key
"""
if not DB_PATH.exists():
raise FileNotFoundError(f"Messages database not found at: {DB_PATH}")
with SQLiteConnection(DB_PATH) as conn:
cursor = conn.cursor()
try:
# Verify table exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name=?
""", [table_name])
if not cursor.fetchone():
raise ValueError(f"Table '{table_name}' does not exist")
# Get table schema
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
return [dict(row) for row in columns]
except sqlite3.Error as e:
raise ValueError(f"SQLite error: {str(e)}")