From 7d8e955ed9251574ecd789d205dcd9f81fdd10cc Mon Sep 17 00:00:00 2001 From: cpzt Date: Thu, 27 Jan 2022 19:41:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=9F=A5=E8=AF=A2ODPS?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=20(#1363)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * odps engine * odps engine * odps engine fix --- requirements.txt | 1 + sql/engines/__init__.py | 4 + sql/engines/odps.py | 128 ++++++++++++++++++++++++++++++ sql/models.py | 1 + sql/templates/instance.html | 1 + sql/templates/queryapplylist.html | 4 + sql/templates/sqlquery.html | 4 + 7 files changed, 143 insertions(+) create mode 100644 sql/engines/odps.py diff --git a/requirements.txt b/requirements.txt index 9b0b5ed2e9..67531c2415 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,4 @@ uvloop==0.14.0 httptools==0.1.1 uvicorn==0.12.2 pycryptodome==3.10.1 +pyodps==0.10.7.1 diff --git a/sql/engines/__init__.py b/sql/engines/__init__.py index d542c4a899..728a3429ca 100644 --- a/sql/engines/__init__.py +++ b/sql/engines/__init__.py @@ -178,3 +178,7 @@ def get_engine(instance=None): # pragma: no cover from .phoenix import PhoenixEngine return PhoenixEngine(instance=instance) + + elif instance.db_type == 'odps': + from .odps import ODPSEngine + return ODPSEngine(instance=instance) diff --git a/sql/engines/odps.py b/sql/engines/odps.py new file mode 100644 index 0000000000..4c0e0c6de0 --- /dev/null +++ b/sql/engines/odps.py @@ -0,0 +1,128 @@ +# -*- coding: UTF-8 -*- + +import re +import logging + +from . import EngineBase +from .models import ResultSet, ReviewSet, ReviewResult + +from odps import ODPS + + +logger = logging.getLogger('default') + + +class ODPSEngine(EngineBase): + + def get_connection(self, db_name=None): + if self.conn: + return self.conn + + db_name = db_name if db_name else self.instance.db_name + + if db_name is None: + raise ValueError("db_name不能为空") + + self.conn = ODPS(self.user, self.password, project=db_name, endpoint=self.host) + + return self.conn + + @property + def name(self): + return 'ODPS' + + @property + def info(self): + return 'ODPS engine' + + def get_all_databases(self): + """获取数据库列表, 返回一个ResultSet + ODPS只有project概念, 直接返回project名称 + """ + result = ResultSet() + + try: + conn = self.get_connection(self.get_connection()) + result.rows = [conn.project] + except Exception as e: + logger.warning(f"ODPS执行异常, {e}") + result.rows = [self.instance.db_name] + return result + + def get_all_tables(self, db_name, **kwargs): + """获取table 列表, 返回一个ResultSet""" + + db_name = db_name if db_name else self.instance.db_name + result_set = ResultSet() + + try: + conn = self.get_connection(db_name=db_name) + + rows = [t.name for t in conn.list_tables()] + result_set.rows = rows + + except Exception as e: + logger.warning(f"ODPS语句执行报错, 错误信息{e}") + result_set.error = str(e) + + return result_set + + def get_all_columns_by_tb(self, db_name, tb_name, **kwargs): + """获取所有字段, 返回一个ResultSet""" + + column_list = ['COLUMN_NAME', 'COLUMN_TYPE', 'COLUMN_COMMENT'] + + conn = self.get_connection(db_name) + + table = conn.get_table(tb_name) + + schema_cols = table.schema.columns + + rows = [] + + for col in schema_cols: + rows.append([col.name, str(col.type), col.comment]) + + result = ResultSet() + result.column_list = column_list + result.rows = rows + return result + + def describe_table(self, db_name, tb_name, **kwargs): + """return ResultSet 类似查询""" + + result = self.get_all_columns_by_tb(db_name, tb_name) + + return result + + def query(self, db_name=None, sql='', limit_num=0, close_conn=True, **kwargs): + """返回 ResultSet """ + result_set = ResultSet(full_sql=sql) + + if not re.match(r"^select", sql, re.I): + result_set.error = str("仅支持ODPS查询语句") + + # 存在limit,替换limit; 不存在,添加limit + if re.search('limit', sql): + sql = re.sub('limit.+(\d+)', 'limit ' + str(limit_num), sql) + else: + if sql.strip()[-1] == ';': + sql = sql[:-1] + sql = sql + ' limit ' + str(limit_num) + ';' + + try: + conn = self.get_connection(db_name) + effect_row = conn.execute_sql(sql) + reader = effect_row.open_reader() + rows = [row.values for row in reader] + column_list = getattr(reader, '_schema').names + + result_set.column_list = column_list + result_set.rows = rows + result_set.affected_rows = len(rows) + + except Exception as e: + logger.warning(f"ODPS语句执行报错, 语句:{sql},错误信息{e}") + result_set.error = str(e) + return result_set + diff --git a/sql/models.py b/sql/models.py index 00ab1edb50..552d24c505 100755 --- a/sql/models.py +++ b/sql/models.py @@ -86,6 +86,7 @@ class Meta: ('oracle', 'Oracle'), ('mongo', 'Mongo'), ('phoenix', 'Phoenix'), + ('odps', 'ODPS'), ('goinception', 'goInception')) diff --git a/sql/templates/instance.html b/sql/templates/instance.html index 03c43adbf9..43562972b1 100644 --- a/sql/templates/instance.html +++ b/sql/templates/instance.html @@ -20,6 +20,7 @@ +
diff --git a/sql/templates/queryapplylist.html b/sql/templates/queryapplylist.html index d8270a0cb8..bd358f7b2f 100644 --- a/sql/templates/queryapplylist.html +++ b/sql/templates/queryapplylist.html @@ -54,6 +54,7 @@ +
@@ -163,6 +164,7 @@ $("#optgroup-oracle").empty(); $("#optgroup-mongo").empty(); $("#optgroup-phoenix").empty(); + $("#optgroup-odps").empty(); for (var i = 0; i < result.length; i++) { var instance = ""; if (result[i]['db_type'] === 'mysql') { @@ -179,6 +181,8 @@ $("#optgroup-mongo").append(instance); } else if (result[i]['db_type'] === 'phoenix') { $("#optgroup-phoenix").append(instance); + } else if (result[i]['db_type'] === 'odps') { + $("#optgroup-odps").append(instance); } } $('#instance_name').selectpicker('render'); diff --git a/sql/templates/sqlquery.html b/sql/templates/sqlquery.html index 332703b61d..984cfd30bc 100644 --- a/sql/templates/sqlquery.html +++ b/sql/templates/sqlquery.html @@ -78,6 +78,7 @@ +
@@ -1262,6 +1263,7 @@ $("#optgroup-pgsql").empty(); $("#optgroup-mongo").empty(); $("#optgroup-phoenix").empty(); + $("#optgroup-odps").empty(); for (let i = 0; i < result.length; i++) { let instance = ""; if (result[i]['db_type'] === 'mysql') { @@ -1278,6 +1280,8 @@ $("#optgroup-mongo").append(instance); } else if (result[i]['db_type'] === 'phoenix') { $("#optgroup-phoenix").append(instance); + } else if (result[i]['db_type'] === 'odps') { + $("#optgroup-odps").append(instance); } } $('#instance_name').selectpicker('render');