From 741d9d91f42c6c2a0280b9faa93156d7588615cc Mon Sep 17 00:00:00 2001 From: wdg <2515717245@qq.com> Date: Fri, 10 Jun 2022 21:42:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81mongodb=E8=BF=9B=E7=A8=8B?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E6=9F=A5=E7=9C=8B=20(#1563)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 修复下载权限名字错误 * 支持mongodb进程状态查看 * mongodb进程状态查看、kill功能迁移到engine中 * mongodb进程状态查bug修复 Co-authored-by: Leo Q Co-authored-by: 小圈圈 --- common/static/dist/js/utils.js | 24 ++- common/utils/extend_json_encoder.py | 33 ++++ sql/db_diagnostic.py | 152 ++++++++++----- sql/engines/models.py | 5 +- sql/engines/mongo.py | 74 +++++++ sql/engines/tests.py | 21 +- sql/templates/dbdiagnostic.html | 290 +++++++++++++++++++++------- 7 files changed, 478 insertions(+), 121 deletions(-) diff --git a/common/static/dist/js/utils.js b/common/static/dist/js/utils.js index c44f02f731..8d42567c05 100644 --- a/common/static/dist/js/utils.js +++ b/common/static/dist/js/utils.js @@ -24,6 +24,28 @@ var dateFormat = function(fmt, date) { return fmt; }; +// 格式与高亮json格式的字符串 +var jsonHighLight = function(json) { + json = json.toString().replace(/&/g, '&').replace(//g, '>'); + return json.replace(/("(\\u[a-zA-Z0-9]{4}|\\[^u]|[^\\"])*"(\s*:)?|\b(true|false|null)\b|-?\d+(?:\.\d*)?(?:[eE][+\-]?\d+)?)/g, function (match) { + var cls = 'text-muted'; + if (/^"/.test(match)) { + if (/:$/.test(match)) { + cls = 'text-success'; + } else { + match = match + cls = 'text-primary'; + } + } else if (/true|false/.test(match)) { + cls = 'text-success'; + } else if (/null/.test(match)) { + cls = 'text-warning'; + } + return '' + match + ''; + }); +}; + +// 这个函数存在报错,因此不应该把任何模块放在这个模块之后 // 实例配置页面根据db_type选择显示或隐藏mode字段,mode字段只适用于redis实例 (function($) { $(function() { @@ -40,4 +62,4 @@ var dateFormat = function(fmt, date) { toggleMode($(this).val()); }); }); -})(django && django.jQuery || jQuery); +})(django && django.jQuery || jQuery); \ No newline at end of file diff --git a/common/utils/extend_json_encoder.py b/common/utils/extend_json_encoder.py index d7c525b8bb..8f8837ad1c 100644 --- a/common/utils/extend_json_encoder.py +++ b/common/utils/extend_json_encoder.py @@ -1,4 +1,5 @@ # -*- coding: UTF-8 -*- +import base64 import simplejson as json from decimal import Decimal @@ -6,6 +7,8 @@ from functools import singledispatch from ipaddress import IPv4Address, IPv6Address from uuid import UUID +from bson.objectid import ObjectId +from bson.timestamp import Timestamp @singledispatch @@ -58,6 +61,16 @@ def _(o): return str(o) +@convert.register(ObjectId) +def _(o): + return str(o) + + +@convert.register(Timestamp) +def _(o): + return str(o) + + class ExtendJSONEncoder(json.JSONEncoder): def default(self, obj): try: @@ -76,3 +89,23 @@ def default(self, obj): return convert(obj) except TypeError: return super(ExtendJSONEncoderFTime, self).default(obj) + + +# 使用simplejson处理形如 b'\xaa' 的bytes类型数据会失败,但使用json模块构造这个对象时不能使用bigint_as_string方法 +import json +class ExtendJSONEncoderBytes(json.JSONEncoder): + def default(self, obj): + try: + # 使用convert.register处理会报错 ValueError: Circular reference detected + # 不是utf-8格式的bytes格式需要先进行base64编码转换 + if isinstance(obj, bytes): + try: + return o.decode('utf-8') + except: + return base64.b64encode(obj).decode('utf-8') + else: + return convert(obj) + except TypeError: + print(type(obj)) + return super(ExtendJSONEncoderBytes, self).default(obj) + diff --git a/sql/db_diagnostic.py b/sql/db_diagnostic.py index 48965ad608..ad1f8b7599 100644 --- a/sql/db_diagnostic.py +++ b/sql/db_diagnostic.py @@ -1,18 +1,21 @@ +import logging +import traceback import MySQLdb - -import simplejson as json +#import simplejson as json +import json from django.contrib.auth.decorators import permission_required from django.http import HttpResponse from sql.engines import get_engine -from common.utils.extend_json_encoder import ExtendJSONEncoder +from common.utils.extend_json_encoder import ExtendJSONEncoder, ExtendJSONEncoderBytes from sql.utils.resource_group import user_instances from .models import AliyunRdsConfig, Instance from .aliyun_rds import process_status as aliyun_process_status, create_kill_session as aliyun_create_kill_session, \ kill_session as aliyun_kill_session, sapce_status as aliyun_sapce_status +logger = logging.getLogger('default') # 问题诊断--进程列表 @permission_required('sql.process_view', raise_exception=True) @@ -21,69 +24,95 @@ def process(request): command_type = request.POST.get('command_type') try: - instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name) + instance = user_instances(request.user).get(instance_name=instance_name) except Instance.DoesNotExist: result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []} return HttpResponse(json.dumps(result), content_type='application/json') - base_sql = "select id, user, host, db, command, time, state, ifnull(info,'') as info from information_schema.processlist" - # 判断是RDS还是其他实例 - if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists(): - result = aliyun_process_status(request) - else: - # escape - command_type = MySQLdb.escape_string(command_type).decode('utf-8') - - if command_type == 'All': - sql = base_sql + ";" - elif command_type == 'Not Sleep': - sql = "{} where command<>'Sleep';".format(base_sql) + query_engine = get_engine(instance=instance) + query_result = None + if instance.db_type == 'mysql': + base_sql = "select id, user, host, db, command, time, state, ifnull(info,'') as info from information_schema.processlist" + # 判断是RDS还是其他实例 + if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists(): + result = aliyun_process_status(request) else: - sql = "{} where command= '{}';".format(base_sql, command_type) - query_engine = get_engine(instance=instance) - query_result = query_engine.query('information_schema', sql) + # escape + command_type = MySQLdb.escape_string(command_type).decode('utf-8') + if not command_type: + command_type = 'Query' + if command_type == 'All': + sql = base_sql + ";" + elif command_type == 'Not Sleep': + sql = "{} where command<>'Sleep';".format(base_sql) + else: + sql = "{} where command= '{}';".format(base_sql, command_type) + + query_result = query_engine.query('information_schema', sql) + + elif instance.db_type == 'mongo': + query_result = query_engine.current_op(command_type) + print(query_result) + + else: + result = {'status': 1, 'msg': '暂时不支持%s类型数据库的进程列表查询' % instance.db_type , 'data': []} + return HttpResponse(json.dumps(result), content_type='application/json') + + if query_result: if not query_result.error: processlist = query_result.to_dict() result = {'status': 0, 'msg': 'ok', 'rows': processlist} else: result = {'status': 1, 'msg': query_result.error} + # 返回查询结果 - return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True), + # ExtendJSONEncoderBytes 使用json模块,bigint_as_string只支持simplejson + return HttpResponse(json.dumps(result, cls=ExtendJSONEncoderBytes), content_type='application/json') -# 问题诊断--通过进程id构建请求 +# 问题诊断--通过线程id构建请求 这里只是用于确定将要kill的线程id还在运行 @permission_required('sql.process_kill', raise_exception=True) def create_kill_session(request): instance_name = request.POST.get('instance_name') thread_ids = request.POST.get('ThreadIDs') try: - instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name) + instance = user_instances(request.user).get(instance_name=instance_name) except Instance.DoesNotExist: result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []} return HttpResponse(json.dumps(result), content_type='application/json') result = {'status': 0, 'msg': 'ok', 'data': []} - # 判断是RDS还是其他实例 - if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists(): - result = aliyun_create_kill_session(request) + query_engine = get_engine(instance=instance) + if instance.db_type == 'mysql': + # 判断是RDS还是其他实例 + if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists(): + result = aliyun_create_kill_session(request) + else: + thread_ids = json.loads(thread_ids) + + sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\ + .format(','.join(str(tid) for tid in thread_ids)) + all_kill_sql = query_engine.query('information_schema', sql) + kill_sql = '' + for row in all_kill_sql.rows: + kill_sql = kill_sql + row[0] + result['data'] = kill_sql + + elif instance.db_type == 'mongo': + kill_command = query_engine.get_kill_command(json.loads(thread_ids)) + result['data'] = kill_command + else: - thread_ids = json.loads(thread_ids) - query_engine = get_engine(instance=instance) - sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\ - .format(','.join(str(tid) for tid in thread_ids)) - all_kill_sql = query_engine.query('information_schema', sql) - kill_sql = '' - for row in all_kill_sql.rows: - kill_sql = kill_sql + row[0] - result['data'] = kill_sql + result = {'status': 1, 'msg': '暂时不支持%s类型数据库通过进程id构建请求' % instance.db_type , 'data': []} + return HttpResponse(json.dumps(result), content_type='application/json') # 返回查询结果 return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True), content_type='application/json') -# 问题诊断--终止会话 +# 问题诊断--终止会话 这里是实际执行kill的操作 @permission_required('sql.process_kill', raise_exception=True) def kill_session(request): instance_name = request.POST.get('instance_name') @@ -91,24 +120,33 @@ def kill_session(request): result = {'status': 0, 'msg': 'ok', 'data': []} try: - instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name) + instance = user_instances(request.user).get(instance_name=instance_name) except Instance.DoesNotExist: result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []} return HttpResponse(json.dumps(result), content_type='application/json') - # 判断是RDS还是其他实例 - if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists(): - result = aliyun_kill_session(request) + engine = get_engine(instance=instance) + if instance.db_type == 'mysql': + # 判断是RDS还是其他实例 + if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists(): + result = aliyun_kill_session(request) + else: + thread_ids = json.loads(thread_ids) + + sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\ + .format(','.join(str(tid) for tid in thread_ids)) + all_kill_sql = engine.query('information_schema', sql) + kill_sql = '' + for row in all_kill_sql.rows: + kill_sql = kill_sql + row[0] + engine.execute('information_schema', kill_sql) + + elif instance.db_type == 'mongo': + engine.kill_op(json.loads(thread_ids)) + else: - thread_ids = json.loads(thread_ids) - engine = get_engine(instance=instance) - sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\ - .format(','.join(str(tid) for tid in thread_ids)) - all_kill_sql = engine.query('information_schema', sql) - kill_sql = '' - for row in all_kill_sql.rows: - kill_sql = kill_sql + row[0] - engine.execute('information_schema', kill_sql) + result = {'status': 1, 'msg': '暂时不支持%s类型数据库终止会话' % instance.db_type , 'data': []} + return HttpResponse(json.dumps(result), content_type='application/json') # 返回查询结果 return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True), @@ -121,11 +159,15 @@ def tablesapce(request): instance_name = request.POST.get('instance_name') try: - instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name) + instance = user_instances(request.user).get(instance_name=instance_name) except Instance.DoesNotExist: result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []} return HttpResponse(json.dumps(result), content_type='application/json') + if instance.db_type != 'mysql': + result = {'status': 1, 'msg': '暂时不支持%s类型数据库的表空间信息查询' % instance.db_type , 'data': []} + return HttpResponse(json.dumps(result), content_type='application/json') + # 判断是RDS还是其他实例 if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists(): result = aliyun_sapce_status(request) @@ -164,11 +206,15 @@ def trxandlocks(request): instance_name = request.POST.get('instance_name') try: - instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name) + instance = user_instances(request.user).get(instance_name=instance_name) except Instance.DoesNotExist: result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []} return HttpResponse(json.dumps(result), content_type='application/json') + if instance.db_type != 'mysql': + result = {'status': 1, 'msg': '暂时不支持%s类型数据库的锁等待查询' % instance.db_type , 'data': []} + return HttpResponse(json.dumps(result), content_type='application/json') + query_engine = get_engine(instance=instance) server_version = query_engine.server_version if server_version < (8, 0, 1): @@ -247,11 +293,15 @@ def innodb_trx(request): instance_name = request.POST.get('instance_name') try: - instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name) + instance = user_instances(request.user).get(instance_name=instance_name) except Instance.DoesNotExist: result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []} return HttpResponse(json.dumps(result), content_type='application/json') + if instance.db_type != 'mysql': + result = {'status': 1, 'msg': '暂时不支持%s类型数据库的长事务查询' % instance.db_type , 'data': []} + return HttpResponse(json.dumps(result), content_type='application/json') + query_engine = get_engine(instance=instance) sql = '''select trx.trx_started, trx.trx_state, diff --git a/sql/engines/models.py b/sql/engines/models.py index cc7091ad62..fc377590eb 100644 --- a/sql/engines/models.py +++ b/sql/engines/models.py @@ -134,7 +134,10 @@ def json(self): def to_dict(self): tmp_list = [] for r in self.rows: - tmp_list += [dict(zip(self.column_list, r))] + if isinstance(r,dict): + tmp_list += [r] + else: + tmp_list += [dict(zip(self.column_list, r))] return tmp_list def to_sep_dict(self): diff --git a/sql/engines/mongo.py b/sql/engines/mongo.py index c9bc47443d..7d777ee77d 100644 --- a/sql/engines/mongo.py +++ b/sql/engines/mongo.py @@ -895,3 +895,77 @@ def fill_query_columns(cursor, columns): if key not in cols: cols.append(key) return cols + + def current_op(self, command_type): + """ + 获取当前连接信息 + + command_type: + Full 包含活跃与不活跃的连接,包含内部的连接,即全部的连接状态 + All 包含活跃与不活跃的连接,不包含内部的连接 + Active 包含活跃 + Inner 内部连接 + """ + print(command_type) + result_set = ResultSet(full_sql='db.aggregate([{"$currentOp": {"allUsers":true, "idleConnections":true}}])') + try: + conn = self.get_connection() + processlists = [] + if not command_type: + command_type = 'Active' + if command_type in ['Full','All','Inner']: + idle_connections = True + else: + idle_connections = False + + # conn.admin.current_op() 这个方法已经被pymongo废除,但mongodb3.6+才支持aggregate + with conn.admin.aggregate([{'$currentOp': {'allUsers':True,'idleConnections':idle_connections}}]) as cursor: + for operation in cursor: + # 对sharding集群的特殊处理 + if not 'client' in operation and operation.get('clientMetadata',{}).get('mongos',{}).get('client',{}): + operation['client'] = operation['clientMetadata']['mongos']['client'] + + # client_s 只是处理的mongos,并不是实际客户端 + # client 在sharding获取不到? + if command_type in ['Full']: + processlists.append(operation) + elif command_type in ['All','Active']: + if 'clientMetadata' in operation: + processlists.append(operation) + elif command_type in ['Inner']: + if not 'clientMetadata' in operation: + processlists.append(operation) + + result_set.rows = processlists + except Exception as e: + logger.warning(f'mongodb获取连接信息错误,错误信息{traceback.format_exc()}') + result_set.error = str(e) + + return result_set + + def get_kill_command(self, opids): + """由传入的opid列表生成kill字符串""" + conn = self.get_connection() + active_opid = [] + with conn.admin.aggregate([{'$currentOp': {'allUsers':True,'idleConnections': False}}]) as cursor: + for operation in cursor: + if 'opid' in operation and operation['opid'] in opids: + active_opid.append(operation['opid']) + + kill_command = '' + for opid in active_opid: + if isinstance(opid,int): + kill_command = kill_command + 'db.killOp({});'.format(opid) + else: + kill_command = kill_command + 'db.killOp("{}");'.format(opid) + + return kill_command + + def kill_op(self, opids): + """kill""" + conn = self.get_connection() + db = conn.admin + for opid in opids: + conn.admin.command({ 'killOp': 1, 'op': opid}) + + \ No newline at end of file diff --git a/sql/engines/tests.py b/sql/engines/tests.py index d8bb17b66c..9ea1fd17a2 100644 --- a/sql/engines/tests.py +++ b/sql/engines/tests.py @@ -1648,8 +1648,25 @@ def test_fill_query_columns(self): {"_id": {"$oid": "7f10162029684728e70045ab"}, "author": "archery"}] cols = self.engine.fill_query_columns(cursor, columns=columns) self.assertEqual(cols, ["_id", "title", "tags", "likes", "text", "author"]) - - + + def test_current_op(self): + command_types = ['Full','All','Inner','Active'] + for command_type in command_types: + result_set = self.engine.current_op(command_type) + self.assertIsInstance(result_set, ResultSet) + + def test_get_kill_command(self): + kill_command1 = self.engine.get_kill_command([111,222]) + kill_command2 = self.engine.get_kill_command(['shards: 111','shards: 222']) + self.assertEqual(kill_command1, 'db.killOp(111);db.killOp(222);') + self.assertEqual(kill_command2, 'db.killOp("shards: 111");db.killOp("shards: 111");') + + def test_kill_op(self): + self.engine.kill_op([111,222]) + self.engine.kill_op(['shards: 111','shards: 222']) + self.assertEqual("","") + + class TestClickHouse(TestCase): def setUp(self): diff --git a/sql/templates/dbdiagnostic.html b/sql/templates/dbdiagnostic.html index 0a0c0832cb..d246e3ff83 100644 --- a/sql/templates/dbdiagnostic.html +++ b/sql/templates/dbdiagnostic.html @@ -21,16 +21,18 @@
-
@@ -91,6 +93,173 @@