From c125e4aac382f72eca172dedbee8e5c8c4f5a004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Thu, 9 May 2024 16:32:21 +0800 Subject: [PATCH 01/33] =?UTF-8?q?=E6=B7=BB=E5=8A=A0favicon=E5=9B=BE?= =?UTF-8?q?=E7=89=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/static/img/favicon.ico | Bin 0 -> 3553 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/common/static/img/favicon.ico b/common/static/img/favicon.ico index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..5a618b8d56f68f522b70f6c588b9588188931a95 100644 GIT binary patch literal 3553 zcmV<74Ic7|P)(=xOa z5*UhM8p+YFBrCohhmZ_SI)(-c(}CE*p1#MB-VemdJ80Qsq`HBCJ(lP zC$$Wj>0-#Iy{Xi#SNnIawb`gVi`{oA@&BJ^jybvg}nocPmP`}gL))bBFh zIc4$1!q>=zcZpiLSkU^U%ZtFjH&PzAiTcx({ML_|CyZ;?eLOOm_))9 z!tKi*hgFGW#H~Ik)reZVTO_<#$OdzmSUeVCJ{Nz7%S7#VnW?9y`aAO%%uZN0f6j$< z3G-l1w((NpqrtZYOcy*$7yCH|qUir50XC;USnbdedz?DryCCsJxn7=b#KPBy5qrl+Qigf$^rwSOphj(JZoAP_9QO8BhW zJolE`g|p+t;bP*cT1I^jiaJ{TXswBSverQUzRpO#?lzN|ZWHxxLlVi>o5_ZHGc_O= zyHOq~vXFf)H$`lQJBj+b!A$9=B0h@z7v~KB`<44sj7wJfg2azz?H`IkmQEELz_awH zSM&Spjdv28Q^=~svYKy7QN!^_m4)~-nJHp3gp^eu=oMt8OH14qsrc(& zn4?9XGZ}ADaucFj`-j5M(l1GR07utE^Z$bY%<`mDk2j@gmU~jw&7M@vMNgUrjz{W5 z$W4)$A?p$KNmSQnBk@qA;tR-4h-&R0ioqiNld>LgU^@SQZJ=5!4V zgxHX@U^4_GBs~I|V4}WtC#j!}RD6NtH*(ab>{WY>OIB?K0L+@s-w*ujjT-3~zzl&I zz?J~6_oS&8c+=Ipyn+#-3=zx-VZ>-kBF}RP#BikI&p0qA=Q=;joSWzY%t+7LjGxTd z^8hf*n-Tx2H$&6y&Coy;VlzS-Au|C*@oNPB;tTjWx?hrpIQ`u-{Y?AJ;6`s|e6lxF zb5<}y+%v-V29XKetHed@#YgZgvrTNk5U2kwzyxn974OZ|xLPvfVUQ7Hy&;?l^=9fq zy_xzmz#TuwR7iS25%U_r2nfKemP~b4OQz-ru@RA&;5L!pxlNjf0nYFo^O0mAE@ZL+ zhO}hd{>#=(&1qzUV1zg&iA@lX32qbB8ET$@4StqhDl0`l1emflbM9R&S(^4qO^{bd zNu(X%3Q9TB$rw`sFlNIm1nn}bM}ka{rz7cj04H0vm@$S}`Zv-lh(;3+7Vhs z1-JnV%s28JeNnU?^aF&`JSCrmaLWsyzZ{p;k`;e=ygHJN2SKH2V1tMJ9l#BqWq5Hf z7%XHcfC>4NtnIWey?tJ5mgdx?y}&%NsRMAs&(V8jld&fN2INP$u!c%hG*1xE5yZVf zS`ksuWuTq|IKgv_Q#ueEUYO};%Aew@W@|?LGI=iuSrLtSIxSVRNTBUEMKRHH6X-I#2*`;7oDi{j$B` zwpbKjAh4+u!8lN7Am0Z#Fi>E=MYc70Hde(S-&spn?;1}-VGKZLQ2{O>(Isi9MEBEJ z6kh=CF^3w6B?cI&;b@%7zARAM5gTxZrGF8t;v>)ii%wev8L(Jl07NIT0S@pi!ky+K^x2BBN{X3-sMTt3|}Gzp);=e00(%Ed0SSB zE{|pL1)5UTp0I;3>H&<(4RFBE>62vp0&8Pkd;~fMc}W8dszU$=28)>IWj(+i>*6E4 zsniqU4Ip71O%4DY@U!%zvdP%4SQlTI6Evn!{xI_ZSj(us2RLwYy^)X^a4Oct7idhT zEb_%T^g;=Gxugac0}y_e`CfERbQszau8(!`5shXlBXs`9%mAKadc?ipzZ(p<$GZ54 z#>JXv!WuwhWPqQgn(p>E19?axI9Ds86Ealcyi_&T~~FQ3`MY4Z5-(;Nf{*Juoo1yTAVg?Cm`Y zJ3Bj}&*y{t_wR?hcI|@OwrztgEiJHUbsDzfBLeR+i&{TQKgfXUMFc*Z^JgsPxe^1) zGI|FF27LYf{Q>biIy&Lu!#=og->CRow{8uHU%d8R_}PX+WWa@wU%YL~*WOGA=w)(w z>ql??J9R`R!0Dwuo}4aacublXyZi?f1>rPV6|5DM}QN4mN^j+zjHr4e!LI%^z^{?_I7ya&>^^Y z?_RiL$4!iT^oQ0t?up3jkx>H2EuO>KVNQ16?`=aDp$(+9QbH-Q5F^ z9BGFK4+`S%7#F{^Y~#52V{r~(jN26Va3J-JmY#vjM;toUv>iT(h{3@@>SVX?n`6h0 z!>+Dw*w%Ig9z1vu?%A^kZr{Egwzjsy=H})g@z)nwus5K}NZluI{Z1XR1>hupfBy;J z(WA#;S63HoYiolC4jh2HcTX<Ut4)|7sBVKVVc$~Sh}Y&N{gxez@-eh(f>OEV_^@#B3y zq0F49@pnkXud{v{GUXJGrW{yot$<%x*^v`P$^Bz@PT0_C3ZA7`@En~mRLDH#FJ$h4 zIXO4rXn-r@zL~ih=IGJmySux);gKUknK>aos&pC~8)04PCB!c+hs9+Zp}mZSCySEB zYez6_ptGkevk%?M#j|ul;2csBWA+y^&+POC{1 zy{Vn&=ti`!;pdnSyjc&{$!?8z0vs7nImM;pgRQ0921_e8U3Rr|(w1;D8jz7msI{V2 ziw5k`fg71b527p6qG?Lsx>sS9PI^PILr44-;D%(dm55&()l##M)8B+lLe&$hqWmnK zg<38$2~}Bvl*cl|S7O4pHy)E3pe25A=v2$A|8^@0a6w$@SSmKbvW@nri$9gSEzo%- zbmb<`((?nA6vupk(hN#BDD8+3n91W*ffl-qcZ9nm`&8w^xaYx4BMi2s6`NqWtr8pY zBL!VHGQ{cQ1NWj985W?^9q5|ovAeQ8J!=;9RqHhu!kBOp%rvnqex%rSagR8(s>4_s zVBPqIx8jS+y>`2OW~S=XM1dLH=+F^HhmL4Pn=?rKDqS4@H!UhCC@3f>C@3f>C@3f> b037%~u&xOfNpM@h00000NkvXXu0mjfKBBW? literal 0 HcmV?d00001 From 5b0c5f97238be9f2f4bb3372a9f06d9199295893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Thu, 20 Jun 2024 17:05:35 +0800 Subject: [PATCH 02/33] firset --- common/static/img/favicon.ico | Bin 3553 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/common/static/img/favicon.ico b/common/static/img/favicon.ico index 5a618b8d56f68f522b70f6c588b9588188931a95..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 GIT binary patch literal 0 HcmV?d00001 literal 3553 zcmV<74Ic7|P)(=xOa z5*UhM8p+YFBrCohhmZ_SI)(-c(}CE*p1#MB-VemdJ80Qsq`HBCJ(lP zC$$Wj>0-#Iy{Xi#SNnIawb`gVi`{oA@&BJ^jybvg}nocPmP`}gL))bBFh zIc4$1!q>=zcZpiLSkU^U%ZtFjH&PzAiTcx({ML_|CyZ;?eLOOm_))9 z!tKi*hgFGW#H~Ik)reZVTO_<#$OdzmSUeVCJ{Nz7%S7#VnW?9y`aAO%%uZN0f6j$< z3G-l1w((NpqrtZYOcy*$7yCH|qUir50XC;USnbdedz?DryCCsJxn7=b#KPBy5qrl+Qigf$^rwSOphj(JZoAP_9QO8BhW zJolE`g|p+t;bP*cT1I^jiaJ{TXswBSverQUzRpO#?lzN|ZWHxxLlVi>o5_ZHGc_O= zyHOq~vXFf)H$`lQJBj+b!A$9=B0h@z7v~KB`<44sj7wJfg2azz?H`IkmQEELz_awH zSM&Spjdv28Q^=~svYKy7QN!^_m4)~-nJHp3gp^eu=oMt8OH14qsrc(& zn4?9XGZ}ADaucFj`-j5M(l1GR07utE^Z$bY%<`mDk2j@gmU~jw&7M@vMNgUrjz{W5 z$W4)$A?p$KNmSQnBk@qA;tR-4h-&R0ioqiNld>LgU^@SQZJ=5!4V zgxHX@U^4_GBs~I|V4}WtC#j!}RD6NtH*(ab>{WY>OIB?K0L+@s-w*ujjT-3~zzl&I zz?J~6_oS&8c+=Ipyn+#-3=zx-VZ>-kBF}RP#BikI&p0qA=Q=;joSWzY%t+7LjGxTd z^8hf*n-Tx2H$&6y&Coy;VlzS-Au|C*@oNPB;tTjWx?hrpIQ`u-{Y?AJ;6`s|e6lxF zb5<}y+%v-V29XKetHed@#YgZgvrTNk5U2kwzyxn974OZ|xLPvfVUQ7Hy&;?l^=9fq zy_xzmz#TuwR7iS25%U_r2nfKemP~b4OQz-ru@RA&;5L!pxlNjf0nYFo^O0mAE@ZL+ zhO}hd{>#=(&1qzUV1zg&iA@lX32qbB8ET$@4StqhDl0`l1emflbM9R&S(^4qO^{bd zNu(X%3Q9TB$rw`sFlNIm1nn}bM}ka{rz7cj04H0vm@$S}`Zv-lh(;3+7Vhs z1-JnV%s28JeNnU?^aF&`JSCrmaLWsyzZ{p;k`;e=ygHJN2SKH2V1tMJ9l#BqWq5Hf z7%XHcfC>4NtnIWey?tJ5mgdx?y}&%NsRMAs&(V8jld&fN2INP$u!c%hG*1xE5yZVf zS`ksuWuTq|IKgv_Q#ueEUYO};%Aew@W@|?LGI=iuSrLtSIxSVRNTBUEMKRHH6X-I#2*`;7oDi{j$B` zwpbKjAh4+u!8lN7Am0Z#Fi>E=MYc70Hde(S-&spn?;1}-VGKZLQ2{O>(Isi9MEBEJ z6kh=CF^3w6B?cI&;b@%7zARAM5gTxZrGF8t;v>)ii%wev8L(Jl07NIT0S@pi!ky+K^x2BBN{X3-sMTt3|}Gzp);=e00(%Ed0SSB zE{|pL1)5UTp0I;3>H&<(4RFBE>62vp0&8Pkd;~fMc}W8dszU$=28)>IWj(+i>*6E4 zsniqU4Ip71O%4DY@U!%zvdP%4SQlTI6Evn!{xI_ZSj(us2RLwYy^)X^a4Oct7idhT zEb_%T^g;=Gxugac0}y_e`CfERbQszau8(!`5shXlBXs`9%mAKadc?ipzZ(p<$GZ54 z#>JXv!WuwhWPqQgn(p>E19?axI9Ds86Ealcyi_&T~~FQ3`MY4Z5-(;Nf{*Juoo1yTAVg?Cm`Y zJ3Bj}&*y{t_wR?hcI|@OwrztgEiJHUbsDzfBLeR+i&{TQKgfXUMFc*Z^JgsPxe^1) zGI|FF27LYf{Q>biIy&Lu!#=og->CRow{8uHU%d8R_}PX+WWa@wU%YL~*WOGA=w)(w z>ql??J9R`R!0Dwuo}4aacublXyZi?f1>rPV6|5DM}QN4mN^j+zjHr4e!LI%^z^{?_I7ya&>^^Y z?_RiL$4!iT^oQ0t?up3jkx>H2EuO>KVNQ16?`=aDp$(+9QbH-Q5F^ z9BGFK4+`S%7#F{^Y~#52V{r~(jN26Va3J-JmY#vjM;toUv>iT(h{3@@>SVX?n`6h0 z!>+Dw*w%Ig9z1vu?%A^kZr{Egwzjsy=H})g@z)nwus5K}NZluI{Z1XR1>hupfBy;J z(WA#;S63HoYiolC4jh2HcTX<Ut4)|7sBVKVVc$~Sh}Y&N{gxez@-eh(f>OEV_^@#B3y zq0F49@pnkXud{v{GUXJGrW{yot$<%x*^v`P$^Bz@PT0_C3ZA7`@En~mRLDH#FJ$h4 zIXO4rXn-r@zL~ih=IGJmySux);gKUknK>aos&pC~8)04PCB!c+hs9+Zp}mZSCySEB zYez6_ptGkevk%?M#j|ul;2csBWA+y^&+POC{1 zy{Vn&=ti`!;pdnSyjc&{$!?8z0vs7nImM;pgRQ0921_e8U3Rr|(w1;D8jz7msI{V2 ziw5k`fg71b527p6qG?Lsx>sS9PI^PILr44-;D%(dm55&()l##M)8B+lLe&$hqWmnK zg<38$2~}Bvl*cl|S7O4pHy)E3pe25A=v2$A|8^@0a6w$@SSmKbvW@nri$9gSEzo%- zbmb<`((?nA6vupk(hN#BDD8+3n91W*ffl-qcZ9nm`&8w^xaYx4BMi2s6`NqWtr8pY zBL!VHGQ{cQ1NWj985W?^9q5|ovAeQ8J!=;9RqHhu!kBOp%rvnqex%rSagR8(s>4_s zVBPqIx8jS+y>`2OW~S=XM1dLH=+F^HhmL4Pn=?rKDqS4@H!UhCC@3f>C@3f>C@3f> b037%~u&xOfNpM@h00000NkvXXu0mjfKBBW? From 9a26c932512ab461c3588f0d84693181851d6ace Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Thu, 20 Jun 2024 17:53:19 +0800 Subject: [PATCH 03/33] =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.list | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.env.list b/.env.list index 87f3fc5549..c7bde7b8ba 100644 --- a/.env.list +++ b/.env.list @@ -1,5 +1,5 @@ # https://django-environ.readthedocs.io/en/latest/quickstart.html#usage -# https://docs.djangoproject.com/zh-hans/4.1/ref/settings/ +# https://docs.djangoproject.com/zh-hans/4.1/ref/settings/ DEBUG=false DATABASE_URL=mysql://root:@127.0.0.1:3306/archery CACHE_URL=redis://127.0.0.1:6379/0 From 9793deb485c3434ba78aba3ef7e912ac2ec0ad64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Thu, 20 Jun 2024 17:53:56 +0800 Subject: [PATCH 04/33] =?UTF-8?q?=E6=92=A4=E9=94=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.list | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.env.list b/.env.list index c7bde7b8ba..87f3fc5549 100644 --- a/.env.list +++ b/.env.list @@ -1,5 +1,5 @@ # https://django-environ.readthedocs.io/en/latest/quickstart.html#usage -# https://docs.djangoproject.com/zh-hans/4.1/ref/settings/ +# https://docs.djangoproject.com/zh-hans/4.1/ref/settings/ DEBUG=false DATABASE_URL=mysql://root:@127.0.0.1:3306/archery CACHE_URL=redis://127.0.0.1:6379/0 From 8ae274ccb060ff9ca0ec4f32cb5ad0ef4abcd99f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Thu, 8 Aug 2024 18:40:03 +0800 Subject: [PATCH 05/33] =?UTF-8?q?ES=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- archery/settings.py | 2 + requirements.txt | 1 + sql/engines/elasticsearch.py | 266 +++++++++++++++++++++++++++++++++++ sql/models.py | 1 + 4 files changed, 270 insertions(+) create mode 100644 sql/engines/elasticsearch.py diff --git a/archery/settings.py b/archery/settings.py index de45f1dd41..170d61da23 100644 --- a/archery/settings.py +++ b/archery/settings.py @@ -53,6 +53,7 @@ "odps", "cassandra", "doris", + "elasticsearch" ], ), ENABLED_NOTIFIERS=( @@ -101,6 +102,7 @@ "phoenix": {"path": "sql.engines.phoenix:PhoenixEngine"}, "odps": {"path": "sql.engines.odps:ODPSEngine"}, "doris": {"path": "sql.engines.doris:DorisEngine"}, + "elasticsearch": {"path": "sql.engines.elasticsearch:ElasticsearchEngine"}, } ENABLED_NOTIFIERS = env("ENABLED_NOTIFIERS") diff --git a/requirements.txt b/requirements.txt index 70a447bfab..9f1a363b24 100644 --- a/requirements.txt +++ b/requirements.txt @@ -44,3 +44,4 @@ django-cas-ng==4.3.0 cassandra-driver httpx OpenAI +elasticsearch==8.14.0 \ No newline at end of file diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py new file mode 100644 index 0000000000..982eadac9a --- /dev/null +++ b/sql/engines/elasticsearch.py @@ -0,0 +1,266 @@ +# -*- coding: UTF-8 -*- +import re, time +import pymongo +import logging +import traceback +import subprocess +import simplejson as json +import datetime +import tempfile + +from . import EngineBase +from .models import ResultSet, ReviewSet, ReviewResult +from common.config import SysConfig + +from elasticsearch import Elasticsearch +from elasticsearch.exceptions import TransportError + + +logger = logging.getLogger("default") + + +class ElasticsearchEngine(EngineBase): + """Elasticsearch 引擎实现""" + + def __init__(self, instance=None): + self.db_separator = "__" # 设置分隔符 + super().__init__(instance=instance) + + def get_connection(self, db_name=None): + if self.conn: + return self.conn + if self.instance: + scheme = "https" if self.is_ssl else "http" + hosts = [ + { + "host": self.host, + "port": self.port, + "scheme": scheme, + "use_ssl": self.is_ssl, + } + ] + http_auth = ( + (self.user, self.password) if self.user and self.password else None + ) + + try: + # 创建 Elasticsearch 连接,高版本有basic_auth + self.conn = Elasticsearch( + hosts=hosts, + http_auth=http_auth, + verify_certs=False, # 关闭证书验证 + ) + except Exception as e: + raise Exception(f"Elasticsearch 连接建立失败: {str(e)}") + if not self.conn: + raise Exception("Elasticsearch 连接无法建立。") + return self.conn + + def test_connection(self): + """测试实例链接是否正常""" + return self.get_all_databases() + + @property + def name(self): + return "elasticsearch" + + @property + def info(self): + return "Elasticsearch 引擎" + + def get_all_databases(self): + """获取所有“数据库”名(从索引名提取),默认提取 __ 前的部分作为数据库名""" + try: + self.get_connection() + # 获取所有的别名,没有别名就是本身。 + indices = self.conn.indices.get_alias(index="*") + database_names = set() + database_names.add("system") # 系统表名使用的库名 + for index_name in indices.keys(): + if self.db_separator in index_name: + db_name = index_name.split(self.db_separator)[0] + database_names.add(db_name) + elif index_name.startswith(".kibana_"): + database_names.add("system_kibana") + elif index_name.startswith(".internal."): + database_names.add("system_internal") + database_names.add("other") # 表名没有__时,使用的库名 + database_names_sorted = sorted(database_names) + return ResultSet(rows=database_names_sorted) + except Exception as e: + logger.error(f"获取数据库时出错:{e}{traceback.format_exc()}") + raise Exception(f"获取数据库时出错: {str(e)}") + + def get_all_tables(self, db_name, **kwargs): + """根据给定的数据库名获取所有相关的表名""" + try: + self.get_connection() + indices = self.conn.indices.get_alias(index="*") + tables = set() + + db_mapping = { + "system_kibana": ".kibana_", + "system_internal": ".internal.", + "system": ".", + "other": "other", + } + # 根据分隔符分隔的库名 + if db_name not in db_mapping: + index_prefix = db_name + tables = [ + index for index in indices.keys() if index.startswith(index_prefix) + ] + else: + # 处理系统表,和other,循环db_mapping.items() 很难实现。 + for index_name in indices.keys(): + if index_name.startswith(".kibana_") | index_name.startswith( + ".kibana-" + ): + if db_name == "system_kibana": + tables.add(index_name) + continue + elif index_name.startswith(".internal."): + if db_name == "system_internal": + tables.add(index_name) + continue + elif index_name.startswith("."): + if db_name == "system": + tables.add(index_name) + continue + elif index_name.startswith(db_name): + tables.add(index_name) + continue + else: + if db_name == "other": + tables.add(index_name) + tables_sorted = sorted(tables) + return ResultSet(rows=tables_sorted) + except Exception as e: + raise Exception(f"获取表列表时出错: {str(e)}") + + def get_all_columns_by_tb(self, db_name, tb_name, **kwargs): + """获取所有字段""" + result_set = ResultSet(full_sql=f"{tb_name}/_mapping") + try: + self.get_connection() + mapping = self.conn.indices.get_mapping(index=tb_name) + properties = ( + mapping.get(tb_name, {}).get("mappings", {}).get("properties", None) + ) + # 返回字段名 + result_set.column_list = ["column_name"] + if properties is None: + result_set.rows = [("无")] + else: + result_set.rows = list(properties.keys()) + return result_set + except Exception as e: + raise Exception(f"获取字段时出错: {str(e)}") + + def describe_table(self, db_name, tb_name, **kwargs): + """表结构""" + result_set = ResultSet(full_sql=f"{tb_name}/_mapping") + try: + self.get_connection() + mapping = self.conn.indices.get_mapping(index=tb_name) + properties = ( + mapping.get(tb_name, {}).get("mappings", {}).get("properties", None) + ) + # 创建包含字段名、类型和其他信息的列表结构 + result_set.column_list = ["column_name", "type", "fields"] + if properties is None: + result_set.rows = [("无", "无", "无")] + else: + result_set.rows = [ + ( + column, + details.get("type"), + json.dumps(details.get("fields", {})), + ) + for column, details in properties.items() + ] + return result_set + except Exception as e: + raise Exception(f"获取字段时出错: {str(e)}") + + def query_check(self, db_name=None, sql=""): + """语句检查""" + result = {"msg": "", "bad_query": False, "filtered_sql": sql, "has_star": False} + result["msg"] += "语句检查,未实现。" + result["bad_query"] = False + return result + + def filter_sql(self, sql="", limit_num=0): + """过滤 SQL 语句""" + return sql.strip() + + def query( + self, + db_name=None, + sql="", + limit_num=0, + close_conn=True, + parameters=None, + **kwargs, + ): + """执行查询""" + try: + query_string=sql + # 解析查询字符串 + lines = query_string.splitlines() + method_line = lines[0].strip() + query_body = "\n".join(lines[1:]).strip() + # 如果 query_body 为空,使用默认查询体 + if not query_body: + query_body = json.dumps({"query": {"match_all": {}}}) + + # 确保 query_body 是有效的 JSON + try: + json_body = json.loads(query_body) + except json.JSONDecodeError as json_err: + raise ValueError(f"{query_body} 无法转为Json格式。{json_err},") + + # 提取方法和路径 + method, path_with_params = method_line.split(maxsplit=1) + # 确保路径以 '/' 开头 + if not path_with_params.startswith('/'): + path_with_params = '/' + path_with_params + + # 分离路径和查询参数 + path, params_str = path_with_params.split('?', 1) if '?' in path_with_params else (path_with_params, "") + params = dict(param.split('=') for param in params_str.split('&') if '=' in param) + + # 提取索引名称 + index = path.split('/')[1] + + # 从参数中提取 filter_path + filter_path = params.get('filter_path', None) + + # 执行搜索查询 + response = self.conn.search( + index=index, + body=json_body, + size=limit_num if limit_num > 0 else 100, # 使用 limit_num 或默认值 5 + filter_path=filter_path + ) + + # 提取查询结果 + hits = response.get("hits", {}).get("hits", []) + results = [{'_id': hit.get('_id'), **hit.get('_source', {})} for hit in hits] + return ResultSet(rows=results) + except Exception as e: + raise Exception(f"执行查询时出错: {str(e)}") + + def query_masking(self, db_name=None, sql="", resultset=None): + """查询结果脱敏""" + return resultset + + def execute_check(self, db_name=None, sql=""): + """执行检查(未实现)""" + return True + + def execute(self, db_name=None, sql="", close_conn=True, parameters=None): + """执行语句""" + raise NotImplementedError("execute 方法未为 Elasticsearch 实现。") + + diff --git a/sql/models.py b/sql/models.py index 4a6caf5063..f72cbdc845 100755 --- a/sql/models.py +++ b/sql/models.py @@ -134,6 +134,7 @@ class Meta: ("goinception", "goInception"), ("cassandra", "Cassandra"), ("doris", "Doris"), + ("elasticsearch", "Elasticsearch"), ) From 2a1ed48928f0b488570c87583a7d3867a6b1f4af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Thu, 8 Aug 2024 19:07:22 +0800 Subject: [PATCH 06/33] =?UTF-8?q?=E6=94=AF=E6=8C=81ES?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 46 +++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 982eadac9a..957c747858 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -205,7 +205,8 @@ def query( ): """执行查询""" try: - query_string=sql + result_set = ResultSet(full_sql=sql) + query_string = sql # 解析查询字符串 lines = query_string.splitlines() method_line = lines[0].strip() @@ -219,35 +220,54 @@ def query( json_body = json.loads(query_body) except json.JSONDecodeError as json_err: raise ValueError(f"{query_body} 无法转为Json格式。{json_err},") - + # 提取方法和路径 method, path_with_params = method_line.split(maxsplit=1) # 确保路径以 '/' 开头 - if not path_with_params.startswith('/'): - path_with_params = '/' + path_with_params - + if not path_with_params.startswith("/"): + path_with_params = "/" + path_with_params + # 分离路径和查询参数 - path, params_str = path_with_params.split('?', 1) if '?' in path_with_params else (path_with_params, "") - params = dict(param.split('=') for param in params_str.split('&') if '=' in param) + path, params_str = ( + path_with_params.split("?", 1) + if "?" in path_with_params + else (path_with_params, "") + ) + params = dict( + param.split("=") for param in params_str.split("&") if "=" in param + ) # 提取索引名称 - index = path.split('/')[1] + index = path.split("/")[1] # 从参数中提取 filter_path - filter_path = params.get('filter_path', None) + filter_path = params.get("filter_path", None) # 执行搜索查询 response = self.conn.search( index=index, body=json_body, size=limit_num if limit_num > 0 else 100, # 使用 limit_num 或默认值 5 - filter_path=filter_path + filter_path=filter_path, ) # 提取查询结果 hits = response.get("hits", {}).get("hits", []) - results = [{'_id': hit.get('_id'), **hit.get('_source', {})} for hit in hits] - return ResultSet(rows=results) + rows = [ + {"_id": hit.get("_id"), **hit.get("_source", {})} for hit in hits + ] + # 如果有结果,获取字段名作为列名 + if rows: + first_row = rows[0] + column_list = list(first_row.keys()) + else: + column_list = [] + + # 构建结果集 + result_set.rows = [tuple(row.values()) for row in rows] # 只获取值 + result_set.column_list = column_list + result_set.affected_rows = len(result_set.rows) + return result_set except Exception as e: raise Exception(f"执行查询时出错: {str(e)}") @@ -262,5 +282,3 @@ def execute_check(self, db_name=None, sql=""): def execute(self, db_name=None, sql="", close_conn=True, parameters=None): """执行语句""" raise NotImplementedError("execute 方法未为 Elasticsearch 实现。") - - From f3c774916320c0d6e878c838347ed0f30c816b85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Thu, 8 Aug 2024 19:11:25 +0800 Subject: [PATCH 07/33] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 957c747858..09a68e2a4e 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -253,9 +253,7 @@ def query( # 提取查询结果 hits = response.get("hits", {}).get("hits", []) - rows = [ - {"_id": hit.get("_id"), **hit.get("_source", {})} for hit in hits - ] + rows = [{"_id": hit.get("_id"), **hit.get("_source", {})} for hit in hits] # 如果有结果,获取字段名作为列名 if rows: first_row = rows[0] From bdc02436036b1c359d5351620e299a63165366b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Thu, 8 Aug 2024 19:12:20 +0800 Subject: [PATCH 08/33] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- archery/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/archery/settings.py b/archery/settings.py index 170d61da23..949b0640fe 100644 --- a/archery/settings.py +++ b/archery/settings.py @@ -53,7 +53,7 @@ "odps", "cassandra", "doris", - "elasticsearch" + "elasticsearch", ], ), ENABLED_NOTIFIERS=( From 6cfd26435f44a9c3c881aab3e23ed9e5c76ea8b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Fri, 9 Aug 2024 11:38:36 +0800 Subject: [PATCH 09/33] =?UTF-8?q?=E7=AE=A1=E7=90=86=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 206 +++++++++++++++++++++++++---------- 1 file changed, 150 insertions(+), 56 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 09a68e2a4e..79305d0724 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -19,6 +19,25 @@ logger = logging.getLogger("default") +class QueryParamsEs: + def __init__( + self, + index: str, + path: str, + method: str, + size: int, + filter_path: str = None, + query_body: dict = None, + ): + self.index = index + self.path = path + self.method = method + self.filter_path = filter_path + self.size = size + # 确保 query_body 不为 None + self.query_body = query_body if query_body is not None else {} + + class ElasticsearchEngine(EngineBase): """Elasticsearch 引擎实现""" @@ -186,8 +205,18 @@ def describe_table(self, db_name, tb_name, **kwargs): def query_check(self, db_name=None, sql=""): """语句检查""" result = {"msg": "", "bad_query": False, "filtered_sql": sql, "has_star": False} - result["msg"] += "语句检查,未实现。" - result["bad_query"] = False + # 使用正则表达式去除开头的空白字符和换行符 + tripped_sql = re.sub(r"^\s+", "", sql).lower() + result["filtered_sql"] = tripped_sql + # 检查是否以 'get' 或 'select' 开头 + if tripped_sql.startswith("get ") or tripped_sql.startswith("select "): + result["msg"] = "语句检查通过。" + result["bad_query"] = False + else: + result["msg"] = ( + "语句检查失败:语句必须以 'get' 或 'select' 开头。示例查询:GET /dmp_iv/_search、select * from dmp__iv limit 10;" + ) + result["bad_query"] = True return result def filter_sql(self, sql="", limit_num=0): @@ -206,69 +235,134 @@ def query( """执行查询""" try: result_set = ResultSet(full_sql=sql) - query_string = sql - # 解析查询字符串 - lines = query_string.splitlines() - method_line = lines[0].strip() - query_body = "\n".join(lines[1:]).strip() - # 如果 query_body 为空,使用默认查询体 - if not query_body: - query_body = json.dumps({"query": {"match_all": {}}}) - - # 确保 query_body 是有效的 JSON - try: - json_body = json.loads(query_body) - except json.JSONDecodeError as json_err: - raise ValueError(f"{query_body} 无法转为Json格式。{json_err},") - - # 提取方法和路径 - method, path_with_params = method_line.split(maxsplit=1) - # 确保路径以 '/' 开头 - if not path_with_params.startswith("/"): - path_with_params = "/" + path_with_params - - # 分离路径和查询参数 - path, params_str = ( - path_with_params.split("?", 1) - if "?" in path_with_params - else (path_with_params, "") - ) - params = dict( - param.split("=") for param in params_str.split("&") if "=" in param - ) - - # 提取索引名称 - index = path.split("/")[1] - # 从参数中提取 filter_path - filter_path = params.get("filter_path", None) - - # 执行搜索查询 - response = self.conn.search( - index=index, - body=json_body, - size=limit_num if limit_num > 0 else 100, # 使用 limit_num 或默认值 5 - filter_path=filter_path, - ) + # 解析查询字符串 + query_params = self.parse_es_select_query_to_query_params(sql, limit_num) - # 提取查询结果 - hits = response.get("hits", {}).get("hits", []) - rows = [{"_id": hit.get("_id"), **hit.get("_source", {})} for hit in hits] - # 如果有结果,获取字段名作为列名 - if rows: - first_row = rows[0] - column_list = list(first_row.keys()) + #管理查询处理 + if query_params.path.startswith("/_cat/indices/"): + response = self.conn.cat.indices(index=query_params.index, params={"v": "true"}) + response_data=self.parse_cat_indices_response(response.body) + # 如果有数据,设置列名 + if response_data: + result_set.column_list = list(response_data[0].keys()) + result_set.rows = [tuple(row.values()) for row in response_data] + else: + result_set.column_list = [] + result_set.rows = [] + result_set.affected_rows = 0 else: - column_list = [] + # 执行搜索查询 + response = self.conn.search( + index=query_params.index, + body=query_params.query_body, + filter_path=query_params.filter_path, + ) + + # 提取查询结果 + hits = response.get("hits", {}).get("hits", []) + rows = [{"_id": hit.get("_id"), **hit.get("_source", {})} for hit in hits] + # 如果有结果,获取字段名作为列名 + if rows: + first_row = rows[0] + column_list = list(first_row.keys()) + else: + column_list = [] - # 构建结果集 - result_set.rows = [tuple(row.values()) for row in rows] # 只获取值 - result_set.column_list = column_list + # 构建结果集 + result_set.rows = [tuple(row.values()) for row in rows] # 只获取值 + result_set.column_list = column_list result_set.affected_rows = len(result_set.rows) return result_set except Exception as e: raise Exception(f"执行查询时出错: {str(e)}") + def parse_cat_indices_response(self,response_text): + """解析cat indices结果""" + # 将响应文本按行分割 + lines = response_text.strip().splitlines() + # 获取列标题 + headers = lines[0].strip().split() + # 解析每一行数据 + indices_info = [] + for line in lines[1:]: + # 按空格分割,并与标题进行配对 + values = line.strip().split(maxsplit=len(headers) - 1) + index_info = dict(zip(headers, values)) + indices_info.append(index_info) + return indices_info + + def parse_es_select_query_to_query_params( + self, search_query_str: str, limit_num: int + ) -> QueryParamsEs: + """解析 search query 字符串为 QueryParamsEs 对象""" + + # 解析查询字符串 + lines = search_query_str.splitlines() + method_line = lines[0].strip() + + query_body = "\n".join(lines[1:]).strip() + # 如果 query_body 为空,使用默认查询体 + if not query_body: + query_body = json.dumps({"query": {"match_all": {}}}) + + # 确保 query_body 是有效的 JSON + try: + json_body = json.loads(query_body) + except json.JSONDecodeError as json_err: + raise ValueError(f"{query_body} 无法转为Json格式。{json_err},") + + # 提取方法和路径 + method, path_with_params = method_line.split(maxsplit=1) + # 确保路径以 '/' 开头 + if not path_with_params.startswith("/"): + path_with_params = "/" + path_with_params + + # 分离路径和查询参数 + path, params_str = ( + path_with_params.split("?", 1) + if "?" in path_with_params + else (path_with_params, "") + ) + params = dict( + param.split("=") for param in params_str.split("&") if "=" in param + ) + index_pattern="" + # 判断路径类型并提取索引模式 + if path.startswith("/_cat/indices/"): + # _cat API 路径 + path_parts = path.split("/") + if len(path_parts) > 3: + index_pattern = path_parts[3] + elif path.startswith("/_search"): + # 默认情况,处理常规索引路径 + # 提取索引名称 + path_parts = path.split("/") + if len(path_parts) > 3: + index_pattern = path_parts[1] + + if not index_pattern: + raise Exception("未找到索引名称。") + + # 从参数中提取 filter_path + filter_path = params.get("filter_path", None) + size = limit_num if limit_num > 0 else 100 + # 检查 JSON 中是否已经有 size,如果没有就设置 + if "size" not in json_body: + json_body["size"] = size + + # 构建 QueryParams 对象 + query_params = QueryParamsEs( + index=index_pattern, + path=path_with_params, + method=method, + size=size, + filter_path=filter_path, + query_body=json_body, + ) + + return query_params + def query_masking(self, db_name=None, sql="", resultset=None): """查询结果脱敏""" return resultset From 96ca874704f3cb5663bf8a0b0a66813f50597d2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Fri, 9 Aug 2024 11:54:36 +0800 Subject: [PATCH 10/33] =?UTF-8?q?=E7=AE=A1=E7=90=86=E6=9F=A5=E8=AF=A2X?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 39 +++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 79305d0724..7fb4cb56b3 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -24,6 +24,7 @@ def __init__( self, index: str, path: str, + params: str, method: str, size: int, filter_path: str = None, @@ -31,6 +32,7 @@ def __init__( ): self.index = index self.path = path + self.params = params self.method = method self.filter_path = filter_path self.size = size @@ -239,10 +241,12 @@ def query( # 解析查询字符串 query_params = self.parse_es_select_query_to_query_params(sql, limit_num) - #管理查询处理 + # 管理查询处理 if query_params.path.startswith("/_cat/indices/"): - response = self.conn.cat.indices(index=query_params.index, params={"v": "true"}) - response_data=self.parse_cat_indices_response(response.body) + response = self.conn.cat.indices( + index=query_params.index, params=query_params.params + ) + response_data = self.parse_cat_indices_response(response.body) # 如果有数据,设置列名 if response_data: result_set.column_list = list(response_data[0].keys()) @@ -261,7 +265,9 @@ def query( # 提取查询结果 hits = response.get("hits", {}).get("hits", []) - rows = [{"_id": hit.get("_id"), **hit.get("_source", {})} for hit in hits] + rows = [ + {"_id": hit.get("_id"), **hit.get("_source", {})} for hit in hits + ] # 如果有结果,获取字段名作为列名 if rows: first_row = rows[0] @@ -277,7 +283,7 @@ def query( except Exception as e: raise Exception(f"执行查询时出错: {str(e)}") - def parse_cat_indices_response(self,response_text): + def parse_cat_indices_response(self, response_text): """解析cat indices结果""" # 将响应文本按行分割 lines = response_text.strip().splitlines() @@ -291,7 +297,7 @@ def parse_cat_indices_response(self,response_text): index_info = dict(zip(headers, values)) indices_info.append(index_info) return indices_info - + def parse_es_select_query_to_query_params( self, search_query_str: str, limit_num: int ) -> QueryParamsEs: @@ -324,23 +330,31 @@ def parse_es_select_query_to_query_params( if "?" in path_with_params else (path_with_params, "") ) - params = dict( - param.split("=") for param in params_str.split("&") if "=" in param - ) - index_pattern="" + params = {} + for pair in params_str.split("&"): + if "=" in pair: + key, value = pair.split("=", 1) + else: + key = pair + value = "" + params[key] = value + + index_pattern = "" # 判断路径类型并提取索引模式 if path.startswith("/_cat/indices/"): # _cat API 路径 path_parts = path.split("/") if len(path_parts) > 3: index_pattern = path_parts[3] + if not index_pattern: + index_pattern = "*" elif path.startswith("/_search"): # 默认情况,处理常规索引路径 - # 提取索引名称 + # 提取索引名称 path_parts = path.split("/") if len(path_parts) > 3: index_pattern = path_parts[1] - + if not index_pattern: raise Exception("未找到索引名称。") @@ -355,6 +369,7 @@ def parse_es_select_query_to_query_params( query_params = QueryParamsEs( index=index_pattern, path=path_with_params, + params=params, method=method, size=size, filter_path=filter_path, From 948119a2aa33e55e99ffb73ab6d4c3bb5489dec8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Fri, 9 Aug 2024 12:04:41 +0800 Subject: [PATCH 11/33] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 7fb4cb56b3..ef9e725645 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -243,6 +243,9 @@ def query( # 管理查询处理 if query_params.path.startswith("/_cat/indices/"): + # v这个参数用显示标题,需要加上。 + if "v" not in query_params.params: + query_params.params["v"] = True response = self.conn.cat.indices( index=query_params.index, params=query_params.params ) @@ -331,14 +334,14 @@ def parse_es_select_query_to_query_params( else (path_with_params, "") ) params = {} - for pair in params_str.split("&"): - if "=" in pair: - key, value = pair.split("=", 1) - else: - key = pair - value = "" - params[key] = value - + if params_str: + for pair in params_str.split("&"): + if "=" in pair: + key, value = pair.split("=", 1) + else: + key = pair + value = "" + params[key] = value index_pattern = "" # 判断路径类型并提取索引模式 if path.startswith("/_cat/indices/"): @@ -348,11 +351,11 @@ def parse_es_select_query_to_query_params( index_pattern = path_parts[3] if not index_pattern: index_pattern = "*" - elif path.startswith("/_search"): + elif "/_search" in path: # 默认情况,处理常规索引路径 # 提取索引名称 path_parts = path.split("/") - if len(path_parts) > 3: + if len(path_parts) > 1: index_pattern = path_parts[1] if not index_pattern: From 60c182256a6505411c2f8ca24d67889d196da8c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Fri, 9 Aug 2024 12:11:37 +0800 Subject: [PATCH 12/33] =?UTF-8?q?=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index ef9e725645..83a9ca310b 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -208,10 +208,11 @@ def query_check(self, db_name=None, sql=""): """语句检查""" result = {"msg": "", "bad_query": False, "filtered_sql": sql, "has_star": False} # 使用正则表达式去除开头的空白字符和换行符 - tripped_sql = re.sub(r"^\s+", "", sql).lower() + tripped_sql = re.sub(r"^\s+", "", sql) result["filtered_sql"] = tripped_sql + lower_sql = tripped_sql.lower() # 检查是否以 'get' 或 'select' 开头 - if tripped_sql.startswith("get ") or tripped_sql.startswith("select "): + if lower_sql.startswith("get ") or lower_sql.startswith("select "): result["msg"] = "语句检查通过。" result["bad_query"] = False else: From 7640b9b7b20c1a385ec5c0272a93c04519ecc84b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Fri, 9 Aug 2024 16:04:22 +0800 Subject: [PATCH 13/33] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95X?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 6 ++- sql/engines/test_elasticsearch.py | 89 +++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 sql/engines/test_elasticsearch.py diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 83a9ca310b..70360270b6 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -127,7 +127,7 @@ def get_all_tables(self, db_name, **kwargs): } # 根据分隔符分隔的库名 if db_name not in db_mapping: - index_prefix = db_name + index_prefix = db_name.rstrip(self.db_separator) + self.db_separator tables = [ index for index in indices.keys() if index.startswith(index_prefix) ] @@ -151,6 +151,8 @@ def get_all_tables(self, db_name, **kwargs): elif index_name.startswith(db_name): tables.add(index_name) continue + elif self.db_separator in index_name: + continue else: if db_name == "other": tables.add(index_name) @@ -241,7 +243,7 @@ def query( # 解析查询字符串 query_params = self.parse_es_select_query_to_query_params(sql, limit_num) - + self.get_connection() # 管理查询处理 if query_params.path.startswith("/_cat/indices/"): # v这个参数用显示标题,需要加上。 diff --git a/sql/engines/test_elasticsearch.py b/sql/engines/test_elasticsearch.py new file mode 100644 index 0000000000..3ee8bd07a3 --- /dev/null +++ b/sql/engines/test_elasticsearch.py @@ -0,0 +1,89 @@ +import unittest +from unittest.mock import patch, Mock +from elasticsearch import Elasticsearch +from elasticsearch.exceptions import TransportError +from sql.engines import ResultSet, ReviewSet +from sql.engines.elasticsearch import ElasticsearchEngine, QueryParamsEs +from sql.models import Instance + + +class TestElasticsearchEngine(unittest.TestCase): + def setUp(self): + # 创建一个模拟的 instance 对象,包含必要的属性 + self.mock_instance = Instance() + self.mock_instance.host = "localhost" + self.mock_instance.port = 9200 + self.mock_instance.user = "user" + self.mock_instance.password = "pass" + self.mock_instance.is_ssl = True + + # 初始化 ElasticsearchEngine,传入模拟的 instance + self.engine = ElasticsearchEngine(instance=self.mock_instance) + + @patch("sql.engines.elasticsearch.Elasticsearch") + def test_get_all_databases(self, mockElasticsearch): + mock_conn = Mock() + mock_conn.indices.get_alias.return_value = { + "test__index1": {}, + "test__index2": {}, + ".kibana_1": {}, + ".internal.index": {}, + } + mockElasticsearch.return_value = mock_conn + + result = self.engine.get_all_databases() + expected_result = [ + "other", + "system", + "system_internal", + "system_kibana", + "test", + ] + self.assertEqual(result.rows, expected_result) + + @patch("sql.engines.elasticsearch.Elasticsearch") + def test_get_all_tables(self, mockElasticsearch): + mock_conn = Mock() + mock_conn.indices.get_alias.return_value = { + "test__index1": {}, + "test__index2": {}, + "other_index": {}, + ".kibana_1": {}, + } + mockElasticsearch.return_value = mock_conn + + # Test specific database + result = self.engine.get_all_tables(db_name="test") + self.assertEqual(result.rows, ["test__index1", "test__index2"]) + + # Test system_kibana + result = self.engine.get_all_tables(db_name="system_kibana") + self.assertEqual(result.rows, [".kibana_1"]) + + @patch("sql.engines.elasticsearch.Elasticsearch") + def test_query(self, mockElasticsearch): + mock_conn = Mock() + mock_conn.search.return_value = { + "hits": { + "hits": [ + {"_id": "1", "_source": {"field1": "value1", "field2": "value2"}}, + {"_id": "2", "_source": {"field1": "value3", "field2": "value4"}}, + ] + } + } + mockElasticsearch.return_value = mock_conn + + sql = "GET /test_index/_search" + result = self.engine.query(sql=sql) + expected_rows = [("1", "value1", "value2"), ("2", "value3", "value4")] + self.assertEqual(result.rows, expected_rows) + self.assertEqual(result.column_list, ["_id", "field1", "field2"]) + + def test_query_check(self): + valid_sql = "GET /test_index/_search" + result = self.engine.query_check(sql=valid_sql) + self.assertFalse(result["bad_query"]) + + invalid_sql = "PUT /test_index/_doc/1" + result = self.engine.query_check(sql=invalid_sql) + self.assertTrue(result["bad_query"]) From f7c176f19857f8cadf1b29035507302bebba80c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Fri, 9 Aug 2024 16:08:53 +0800 Subject: [PATCH 14/33] =?UTF-8?q?=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 70360270b6..11d7995fc0 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -219,7 +219,7 @@ def query_check(self, db_name=None, sql=""): result["bad_query"] = False else: result["msg"] = ( - "语句检查失败:语句必须以 'get' 或 'select' 开头。示例查询:GET /dmp_iv/_search、select * from dmp__iv limit 10;" + "语句检查失败:语句必须以 'get' 或 'select' 开头。示例查询:GET /dmp__iv/_search、select * from dmp__iv limit 10;" ) result["bad_query"] = True return result @@ -389,9 +389,5 @@ def query_masking(self, db_name=None, sql="", resultset=None): return resultset def execute_check(self, db_name=None, sql=""): - """执行检查(未实现)""" + """执行检查""" return True - - def execute(self, db_name=None, sql="", close_conn=True, parameters=None): - """执行语句""" - raise NotImplementedError("execute 方法未为 Elasticsearch 实现。") From 3efa2c46d862f7ea81b1c2c2a60817c55071eb6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Fri, 9 Aug 2024 16:23:36 +0800 Subject: [PATCH 15/33] =?UTF-8?q?list,dict=E5=A4=84=E7=90=86=E4=B8=80?= =?UTF-8?q?=E4=B8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 11d7995fc0..c743c955ad 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -271,9 +271,22 @@ def query( # 提取查询结果 hits = response.get("hits", {}).get("hits", []) - rows = [ - {"_id": hit.get("_id"), **hit.get("_source", {})} for hit in hits - ] + # 处理查询结果,将列表和字典转换为 JSON 字符串 + rows = [] + for hit in hits: + # 获取文档 ID 和 _source 数据 + doc_id = hit.get("_id") + source_data = hit.get("_source", {}) + + # 转换需要转换为 JSON 字符串的字段 + for key, value in source_data.items(): + if isinstance(value, (list, dict)): # 如果字段是列表或字典 + source_data[key] = json.dumps(value) # 转换为 JSON 字符串 + + # 构建结果行 + row = {"_id": doc_id, **source_data} + rows.append(row) + # 如果有结果,获取字段名作为列名 if rows: first_row = rows[0] From 0d809ac1f7b8cb4c6d2cd4229054b952267c2b1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Fri, 9 Aug 2024 16:41:13 +0800 Subject: [PATCH 16/33] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index c743c955ad..6ebc258633 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -277,12 +277,12 @@ def query( # 获取文档 ID 和 _source 数据 doc_id = hit.get("_id") source_data = hit.get("_source", {}) - + # 转换需要转换为 JSON 字符串的字段 for key, value in source_data.items(): if isinstance(value, (list, dict)): # 如果字段是列表或字典 source_data[key] = json.dumps(value) # 转换为 JSON 字符串 - + # 构建结果行 row = {"_id": doc_id, **source_data} rows.append(row) @@ -335,7 +335,7 @@ def parse_es_select_query_to_query_params( try: json_body = json.loads(query_body) except json.JSONDecodeError as json_err: - raise ValueError(f"{query_body} 无法转为Json格式。{json_err},") + raise ValueError(f"query_body:{query_body} 无法转为Json格式。{json_err},") # 提取方法和路径 method, path_with_params = method_line.split(maxsplit=1) From 0f866c7596aee5f28cc7b5ab5c0603a24c0cf015 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Fri, 9 Aug 2024 17:21:31 +0800 Subject: [PATCH 17/33] =?UTF-8?q?=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E8=A1=A5=E5=85=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/test_elasticsearch.py | 145 +++++++++++++++++++++++++++++- 1 file changed, 142 insertions(+), 3 deletions(-) diff --git a/sql/engines/test_elasticsearch.py b/sql/engines/test_elasticsearch.py index 3ee8bd07a3..fe6b7ca6cc 100644 --- a/sql/engines/test_elasticsearch.py +++ b/sql/engines/test_elasticsearch.py @@ -1,3 +1,4 @@ +import json import unittest from unittest.mock import patch, Mock from elasticsearch import Elasticsearch @@ -20,6 +21,10 @@ def setUp(self): # 初始化 ElasticsearchEngine,传入模拟的 instance self.engine = ElasticsearchEngine(instance=self.mock_instance) + def test_info_property(self): + # 测试 info 属性是否正确返回描述字符串 + self.assertEqual(self.engine.info, "Elasticsearch 引擎") + @patch("sql.engines.elasticsearch.Elasticsearch") def test_get_all_databases(self, mockElasticsearch): mock_conn = Mock() @@ -66,8 +71,17 @@ def test_query(self, mockElasticsearch): mock_conn.search.return_value = { "hits": { "hits": [ - {"_id": "1", "_source": {"field1": "value1", "field2": "value2"}}, - {"_id": "2", "_source": {"field1": "value3", "field2": "value4"}}, + { + "_id": "1", + "_source": {"field1": "value1", "field2": ["val1", "val2"]}, + }, + { + "_id": "2", + "_source": { + "field1": {"subfield": "value3"}, + "field2": "value4", + }, + }, ] } } @@ -75,10 +89,135 @@ def test_query(self, mockElasticsearch): sql = "GET /test_index/_search" result = self.engine.query(sql=sql) - expected_rows = [("1", "value1", "value2"), ("2", "value3", "value4")] + expected_rows = [ + ("1", "value1", json.dumps(["val1", "val2"])), + ("2", json.dumps({"subfield": "value3"}), "value4"), + ] self.assertEqual(result.rows, expected_rows) self.assertEqual(result.column_list, ["_id", "field1", "field2"]) + @patch("sql.engines.elasticsearch.Elasticsearch") + def test_query_cat_indices(self, mock_elasticsearch): + """test_query_cat_indices""" + mock_conn = Mock() + mock_elasticsearch.return_value = mock_conn + mock_response = Mock() + mock_response.body = "health status index uuid pri rep docs.count docs.deleted store.size pri.store.size dataset.size\nyellow open test__index 3yyJqzgHTJqRkKwhT5Fy7w 3 1 34256 0 4.4mb 4.4mb 4.4mb\nyellow open dmp__iv fzK3nKcpRNunVr5N6gOSsw 3 1 903 0 527.1kb 527.1kb 527.1kb\n" + mock_conn.cat.indices.return_value = mock_response + + sql = "GET /_cat/indices/*?v&s=docs.count:desc" + + # 执行测试的方法 + result = self.engine.query(sql=sql) + + # 验证结果 + expected_columns = [ + "health", + "status", + "index", + "uuid", + "pri", + "rep", + "docs.count", + "docs.deleted", + "store.size", + "pri.store.size", + "dataset.size", + ] + expected_rows = [ + ( + "yellow", + "open", + "test__index", + "3yyJqzgHTJqRkKwhT5Fy7w", + "3", + "1", + "34256", + "0", + "4.4mb", + "4.4mb", + "4.4mb", + ), + ( + "yellow", + "open", + "dmp__iv", + "fzK3nKcpRNunVr5N6gOSsw", + "3", + "1", + "903", + "0", + "527.1kb", + "527.1kb", + "527.1kb", + ), + ] + self.assertEqual(result.column_list, expected_columns) + self.assertEqual(result.rows, expected_rows) + + @patch("sql.engines.elasticsearch.Elasticsearch") + def test_get_all_columns_by_tb(self, mock_elasticsearch): + """测试获取表字段""" + + mock_conn = Mock() + mock_elasticsearch.return_value = mock_conn + + mock_mapping = { + "mappings": { + "properties": { + "field1": {"type": "text"}, + "field2": {"type": "keyword"}, + "field3": {"type": "integer"}, + } + } + } + + mock_conn.indices.get_mapping.return_value = {"test_table": mock_mapping} + + result = self.engine.get_all_columns_by_tb( + db_name="test_db", tb_name="test_table" + ) + + expected_columns = ["column_name"] + expected_rows = ["field1", "field2", "field3"] + + self.assertEqual(result.column_list, expected_columns) + self.assertEqual(result.rows, expected_rows) + + @patch("sql.engines.elasticsearch.Elasticsearch") + def test_describe_table(self, mock_elasticsearch): + """测试表结构""" + + mock_conn = Mock() + mock_elasticsearch.return_value = mock_conn + + mock_mapping = { + "mappings": { + "properties": { + "field1": { + "type": "text", + "fields": {"keyword": {"type": "keyword"}}, + }, + "field2": {"type": "integer"}, + "field3": {"type": "date"}, + } + } + } + mock_conn.indices.get_mapping.return_value = {"test_table": mock_mapping} + + result = self.engine.describe_table(db_name="test_db", tb_name="test_table") + + expected_columns = ["column_name", "type", "fields"] + expected_rows = [ + ("field1", "text", json.dumps({"keyword": {"type": "keyword"}})), + ("field2", "integer", "{}"), + ("field3", "date", "{}"), + ] + + # Assertions + self.assertEqual(result.column_list, expected_columns) + self.assertEqual(result.rows, expected_rows) + def test_query_check(self): valid_sql = "GET /test_index/_search" result = self.engine.query_check(sql=valid_sql) From 238ea474c0bb6bb27a454babcfc79bf37341c09a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Mon, 12 Aug 2024 10:12:15 +0800 Subject: [PATCH 18/33] =?UTF-8?q?True=E6=94=B9=E4=B8=BAtrue=EF=BC=8C=20?= =?UTF-8?q?=E4=BD=BF=E7=94=A8params?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 6ebc258633..96ae68438f 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -27,14 +27,12 @@ def __init__( params: str, method: str, size: int, - filter_path: str = None, query_body: dict = None, ): self.index = index self.path = path self.params = params self.method = method - self.filter_path = filter_path self.size = size # 确保 query_body 不为 None self.query_body = query_body if query_body is not None else {} @@ -248,7 +246,7 @@ def query( if query_params.path.startswith("/_cat/indices/"): # v这个参数用显示标题,需要加上。 if "v" not in query_params.params: - query_params.params["v"] = True + query_params.params["v"] = "true" response = self.conn.cat.indices( index=query_params.index, params=query_params.params ) @@ -266,7 +264,7 @@ def query( response = self.conn.search( index=query_params.index, body=query_params.query_body, - filter_path=query_params.filter_path, + params=query_params.params ) # 提取查询结果 @@ -377,8 +375,7 @@ def parse_es_select_query_to_query_params( if not index_pattern: raise Exception("未找到索引名称。") - # 从参数中提取 filter_path - filter_path = params.get("filter_path", None) + size = limit_num if limit_num > 0 else 100 # 检查 JSON 中是否已经有 size,如果没有就设置 if "size" not in json_body: @@ -391,7 +388,6 @@ def parse_es_select_query_to_query_params( params=params, method=method, size=size, - filter_path=filter_path, query_body=json_body, ) From 98f9042ccb5f42d44d341ab75efcce8b0a66b278 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Mon, 12 Aug 2024 10:13:15 +0800 Subject: [PATCH 19/33] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 96ae68438f..36006d0d87 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -264,7 +264,7 @@ def query( response = self.conn.search( index=query_params.index, body=query_params.query_body, - params=query_params.params + params=query_params.params, ) # 提取查询结果 @@ -375,7 +375,6 @@ def parse_es_select_query_to_query_params( if not index_pattern: raise Exception("未找到索引名称。") - size = limit_num if limit_num > 0 else 100 # 检查 JSON 中是否已经有 size,如果没有就设置 if "size" not in json_body: From e5885f2e43cd9c40821ada2f113fd46147d9ab26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=99=93=E9=A3=9E?= Date: Tue, 13 Aug 2024 09:59:33 +0800 Subject: [PATCH 20/33] =?UTF-8?q?=E5=B1=9E=E6=80=A7=E6=94=B9=E4=B8=BA=20na?= =?UTF-8?q?me:=20str?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Leo Q --- sql/engines/elasticsearch.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 36006d0d87..e04d993b1c 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -79,13 +79,8 @@ def test_connection(self): """测试实例链接是否正常""" return self.get_all_databases() - @property - def name(self): - return "elasticsearch" - - @property - def info(self): - return "Elasticsearch 引擎" + name: str = "elasticsearch" + info: str = "Elasticsearch 引擎" def get_all_databases(self): """获取所有“数据库”名(从索引名提取),默认提取 __ 前的部分作为数据库名""" From c1ca843078b7122a9083a184a2eb253904294475 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 10:11:45 +0800 Subject: [PATCH 21/33] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E5=89=8D=E7=BC=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index e04d993b1c..1103afe174 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -1,4 +1,5 @@ # -*- coding: UTF-8 -*- +import os import re, time import pymongo import logging @@ -61,13 +62,13 @@ def get_connection(self, db_name=None): http_auth = ( (self.user, self.password) if self.user and self.password else None ) - + self.db_name = (self.db_name or "") + "*" try: # 创建 Elasticsearch 连接,高版本有basic_auth self.conn = Elasticsearch( hosts=hosts, http_auth=http_auth, - verify_certs=False, # 关闭证书验证 + verify_certs=True, # 需要证书验证 ) except Exception as e: raise Exception(f"Elasticsearch 连接建立失败: {str(e)}") @@ -87,9 +88,10 @@ def get_all_databases(self): try: self.get_connection() # 获取所有的别名,没有别名就是本身。 - indices = self.conn.indices.get_alias(index="*") + indices = self.conn.indices.get_alias(index=self.db_name) database_names = set() - database_names.add("system") # 系统表名使用的库名 + if self.db_name == "*": + database_names.add("system") # 系统表名使用的库名 for index_name in indices.keys(): if self.db_separator in index_name: db_name = index_name.split(self.db_separator)[0] @@ -109,7 +111,7 @@ def get_all_tables(self, db_name, **kwargs): """根据给定的数据库名获取所有相关的表名""" try: self.get_connection() - indices = self.conn.indices.get_alias(index="*") + indices = self.conn.indices.get_alias(index=self.db_name) tables = set() db_mapping = { @@ -238,7 +240,7 @@ def query( query_params = self.parse_es_select_query_to_query_params(sql, limit_num) self.get_connection() # 管理查询处理 - if query_params.path.startswith("/_cat/indices/"): + if query_params.path.startswith("/_cat/indices"): # v这个参数用显示标题,需要加上。 if "v" not in query_params.params: query_params.params["v"] = "true" @@ -353,7 +355,7 @@ def parse_es_select_query_to_query_params( params[key] = value index_pattern = "" # 判断路径类型并提取索引模式 - if path.startswith("/_cat/indices/"): + if path.startswith("/_cat/indices"): # _cat API 路径 path_parts = path.split("/") if len(path_parts) > 3: @@ -386,11 +388,3 @@ def parse_es_select_query_to_query_params( ) return query_params - - def query_masking(self, db_name=None, sql="", resultset=None): - """查询结果脱敏""" - return resultset - - def execute_check(self, db_name=None, sql=""): - """执行检查""" - return True From 761bb311d2be523b83fc4947e77c931c2e36e8cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 11:40:50 +0800 Subject: [PATCH 22/33] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9F=BA=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elastic_search_engine_base.py | 367 ++++++++++++++++++++++ sql/engines/elasticsearch.py | 356 +-------------------- sql/engines/test_elasticsearch.py | 2 +- 3 files changed, 376 insertions(+), 349 deletions(-) create mode 100644 sql/engines/elastic_search_engine_base.py diff --git a/sql/engines/elastic_search_engine_base.py b/sql/engines/elastic_search_engine_base.py new file mode 100644 index 0000000000..f524c27a42 --- /dev/null +++ b/sql/engines/elastic_search_engine_base.py @@ -0,0 +1,367 @@ +# -*- coding: UTF-8 -*- +import os +import re, time +from click import command +import logging +import traceback +import simplejson as json +from . import EngineBase +from .models import ResultSet, ReviewSet, ReviewResult +from common.config import SysConfig + + +logger = logging.getLogger("default") + + +class QueryParamsSearch: + def __init__( + self, + index: str, + path: str, + params: str, + method: str, + size: int, + query_body: dict = None, + ): + self.index = index + self.path = path + self.params = params + self.method = method + self.size = size + # 确保 query_body 不为 None + self.query_body = query_body if query_body is not None else {} + + +class ElasticsearchEngineBase(EngineBase): + """Elasticsearch、OpenSearch等Search父类实现""" + + def __init__(self, instance=None): + self.db_separator = "__" # 设置分隔符 + # 限制只能2种支持的子类 + self.search_name = ["Elasticsearch", "OpenSearch"] + if self.name not in self.search_name: + raise ValueError( + f"Invalid name: {self.name}. Must be one of {self.search_name}." + ) + super().__init__(instance=instance) + + def get_connection(self, db_name=None): + """返回一个conn实例""" + + def test_connection(self): + """测试实例链接是否正常""" + return self.get_all_databases() + + name: str = "SearchBase" + info: str = "SearchBase 引擎" + + def get_all_databases(self): + """获取所有“数据库”名(从索引名提取),默认提取 __ 前的部分作为数据库名""" + try: + self.get_connection() + # 获取所有的别名,没有别名就是本身。 + indices = self.conn.indices.get_alias(index=self.db_name) + database_names = set() + if self.db_name == "*": + database_names.add("system") # 系统表名使用的库名 + for index_name in indices.keys(): + if self.db_separator in index_name: + db_name = index_name.split(self.db_separator)[0] + database_names.add(db_name) + elif index_name.startswith(".kibana_"): + database_names.add("system_kibana") + elif index_name.startswith(".internal."): + database_names.add("system_internal") + database_names.add("other") # 表名没有__时,使用的库名 + database_names_sorted = sorted(database_names) + return ResultSet(rows=database_names_sorted) + except Exception as e: + logger.error(f"获取数据库时出错:{e}{traceback.format_exc()}") + raise Exception(f"获取数据库时出错: {str(e)}") + + def get_all_tables(self, db_name, **kwargs): + """根据给定的数据库名获取所有相关的表名""" + try: + self.get_connection() + indices = self.conn.indices.get_alias(index=self.db_name) + tables = set() + + db_mapping = { + "system_kibana": ".kibana_", + "system_internal": ".internal.", + "system": ".", + "other": "other", + } + # 根据分隔符分隔的库名 + if db_name not in db_mapping: + index_prefix = db_name.rstrip(self.db_separator) + self.db_separator + tables = [ + index for index in indices.keys() if index.startswith(index_prefix) + ] + else: + # 处理系统表,和other,循环db_mapping.items() 很难实现。 + for index_name in indices.keys(): + if index_name.startswith(".kibana_") | index_name.startswith( + ".kibana-" + ): + if db_name == "system_kibana": + tables.add(index_name) + continue + elif index_name.startswith(".internal."): + if db_name == "system_internal": + tables.add(index_name) + continue + elif index_name.startswith("."): + if db_name == "system": + tables.add(index_name) + continue + elif index_name.startswith(db_name): + tables.add(index_name) + continue + elif self.db_separator in index_name: + continue + else: + if db_name == "other": + tables.add(index_name) + tables_sorted = sorted(tables) + return ResultSet(rows=tables_sorted) + except Exception as e: + raise Exception(f"获取表列表时出错: {str(e)}") + + def get_all_columns_by_tb(self, db_name, tb_name, **kwargs): + """获取所有字段""" + result_set = ResultSet(full_sql=f"{tb_name}/_mapping") + try: + self.get_connection() + mapping = self.conn.indices.get_mapping(index=tb_name) + properties = ( + mapping.get(tb_name, {}).get("mappings", {}).get("properties", None) + ) + # 返回字段名 + result_set.column_list = ["column_name"] + if properties is None: + result_set.rows = [("无")] + else: + result_set.rows = list(properties.keys()) + return result_set + except Exception as e: + raise Exception(f"获取字段时出错: {str(e)}") + + def describe_table(self, db_name, tb_name, **kwargs): + """表结构""" + result_set = ResultSet(full_sql=f"{tb_name}/_mapping") + try: + self.get_connection() + mapping = self.conn.indices.get_mapping(index=tb_name) + properties = ( + mapping.get(tb_name, {}).get("mappings", {}).get("properties", None) + ) + # 创建包含字段名、类型和其他信息的列表结构 + result_set.column_list = ["column_name", "type", "fields"] + if properties is None: + result_set.rows = [("无", "无", "无")] + else: + result_set.rows = [ + ( + column, + details.get("type"), + json.dumps(details.get("fields", {})), + ) + for column, details in properties.items() + ] + return result_set + except Exception as e: + raise Exception(f"获取字段时出错: {str(e)}") + + def query_check(self, db_name=None, sql=""): + """语句检查""" + result = {"msg": "", "bad_query": False, "filtered_sql": sql, "has_star": False} + # 使用正则表达式去除开头的空白字符和换行符 + tripped_sql = re.sub(r"^\s+", "", sql) + result["filtered_sql"] = tripped_sql + lower_sql = tripped_sql.lower() + # 检查是否以 'get' 或 'select' 开头 + if lower_sql.startswith("get ") or lower_sql.startswith("select "): + result["msg"] = "语句检查通过。" + result["bad_query"] = False + else: + result["msg"] = ( + "语句检查失败:语句必须以 'get' 或 'select' 开头。示例查询:GET /dmp__iv/_search、select * from dmp__iv limit 10;" + ) + result["bad_query"] = True + return result + + def filter_sql(self, sql="", limit_num=0): + """过滤 SQL 语句""" + return sql.strip() + + def query( + self, + db_name=None, + sql="", + limit_num=0, + close_conn=True, + parameters=None, + **kwargs, + ): + """执行查询""" + try: + result_set = ResultSet(full_sql=sql) + + # 解析查询字符串 + query_params = self.parse_es_select_query_to_query_params(sql, limit_num) + self.get_connection() + # 管理查询处理 + if query_params.path.startswith("/_cat/indices"): + # v这个参数用显示标题,需要加上。 + if "v" not in query_params.params: + query_params.params["v"] = "true" + response = self.conn.cat.indices( + index=query_params.index, params=query_params.params + ) + response_body="" + if isinstance(response, str): + response_body=response + else: + response_body=response.body + response_data = self.parse_cat_indices_response(response_body) + # 如果有数据,设置列名 + if response_data: + result_set.column_list = list(response_data[0].keys()) + result_set.rows = [tuple(row.values()) for row in response_data] + else: + result_set.column_list = [] + result_set.rows = [] + result_set.affected_rows = 0 + else: + # 执行搜索查询 + response = self.conn.search( + index=query_params.index, + body=query_params.query_body, + params=query_params.params, + ) + + # 提取查询结果 + hits = response.get("hits", {}).get("hits", []) + # 处理查询结果,将列表和字典转换为 JSON 字符串 + rows = [] + for hit in hits: + # 获取文档 ID 和 _source 数据 + doc_id = hit.get("_id") + source_data = hit.get("_source", {}) + + # 转换需要转换为 JSON 字符串的字段 + for key, value in source_data.items(): + if isinstance(value, (list, dict)): # 如果字段是列表或字典 + source_data[key] = json.dumps(value) # 转换为 JSON 字符串 + + # 构建结果行 + row = {"_id": doc_id, **source_data} + rows.append(row) + + # 如果有结果,获取字段名作为列名 + if rows: + first_row = rows[0] + column_list = list(first_row.keys()) + else: + column_list = [] + + # 构建结果集 + result_set.rows = [tuple(row.values()) for row in rows] # 只获取值 + result_set.column_list = column_list + result_set.affected_rows = len(result_set.rows) + return result_set + except Exception as e: + raise Exception(f"执行查询时出错: {str(e)}") + + def parse_cat_indices_response(self, response_text): + """解析cat indices结果""" + # 将响应文本按行分割 + lines = response_text.strip().splitlines() + # 获取列标题 + headers = lines[0].strip().split() + # 解析每一行数据 + indices_info = [] + for line in lines[1:]: + # 按空格分割,并与标题进行配对 + values = line.strip().split(maxsplit=len(headers) - 1) + index_info = dict(zip(headers, values)) + indices_info.append(index_info) + return indices_info + + def parse_es_select_query_to_query_params( + self, search_query_str: str, limit_num: int + ) -> QueryParamsSearch: + """解析 search query 字符串为 QueryParamsSearch 对象""" + + # 解析查询字符串 + lines = search_query_str.splitlines() + method_line = lines[0].strip() + + query_body = "\n".join(lines[1:]).strip() + # 如果 query_body 为空,使用默认查询体 + if not query_body: + query_body = json.dumps({"query": {"match_all": {}}}) + + # 确保 query_body 是有效的 JSON + try: + json_body = json.loads(query_body) + except json.JSONDecodeError as json_err: + raise ValueError(f"query_body:{query_body} 无法转为Json格式。{json_err},") + + # 提取方法和路径 + method, path_with_params = method_line.split(maxsplit=1) + # 确保路径以 '/' 开头 + if not path_with_params.startswith("/"): + path_with_params = "/" + path_with_params + + # 分离路径和查询参数 + path, params_str = ( + path_with_params.split("?", 1) + if "?" in path_with_params + else (path_with_params, "") + ) + params = {} + if params_str: + for pair in params_str.split("&"): + if "=" in pair: + key, value = pair.split("=", 1) + else: + key = pair + value = "" + params[key] = value + index_pattern = "" + # 判断路径类型并提取索引模式 + if path.startswith("/_cat/indices"): + # _cat API 路径 + path_parts = path.split("/") + if len(path_parts) > 3: + index_pattern = path_parts[3] + if not index_pattern: + index_pattern = "*" + elif "/_search" in path: + # 默认情况,处理常规索引路径 + # 提取索引名称 + path_parts = path.split("/") + if len(path_parts) > 1: + index_pattern = path_parts[1] + + if not index_pattern: + raise Exception("未找到索引名称。") + + size = limit_num if limit_num > 0 else 100 + # 检查 JSON 中是否已经有 size,如果没有就设置 + if "size" not in json_body: + json_body["size"] = size + + # 构建 QueryParams 对象 + query_params = QueryParamsSearch( + index=index_pattern, + path=path_with_params, + params=params, + method=method, + size=size, + query_body=json_body, + ) + + return query_params \ No newline at end of file diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 1103afe174..216b7cdd7b 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -1,51 +1,24 @@ # -*- coding: UTF-8 -*- -import os -import re, time -import pymongo -import logging -import traceback -import subprocess -import simplejson as json -import datetime -import tempfile - -from . import EngineBase -from .models import ResultSet, ReviewSet, ReviewResult -from common.config import SysConfig +import logging +from sql.engines.elastic_search_engine_base import ElasticsearchEngineBase from elasticsearch import Elasticsearch from elasticsearch.exceptions import TransportError logger = logging.getLogger("default") + - -class QueryParamsEs: - def __init__( - self, - index: str, - path: str, - params: str, - method: str, - size: int, - query_body: dict = None, - ): - self.index = index - self.path = path - self.params = params - self.method = method - self.size = size - # 确保 query_body 不为 None - self.query_body = query_body if query_body is not None else {} - - -class ElasticsearchEngine(EngineBase): +class ElasticsearchEngine(ElasticsearchEngineBase): """Elasticsearch 引擎实现""" def __init__(self, instance=None): - self.db_separator = "__" # 设置分隔符 + # self.db_separator = "__" # 设置分隔符 super().__init__(instance=instance) + name: str = "Elasticsearch" + info: str = "Elasticsearch 引擎" + def get_connection(self, db_name=None): if self.conn: return self.conn @@ -75,316 +48,3 @@ def get_connection(self, db_name=None): if not self.conn: raise Exception("Elasticsearch 连接无法建立。") return self.conn - - def test_connection(self): - """测试实例链接是否正常""" - return self.get_all_databases() - - name: str = "elasticsearch" - info: str = "Elasticsearch 引擎" - - def get_all_databases(self): - """获取所有“数据库”名(从索引名提取),默认提取 __ 前的部分作为数据库名""" - try: - self.get_connection() - # 获取所有的别名,没有别名就是本身。 - indices = self.conn.indices.get_alias(index=self.db_name) - database_names = set() - if self.db_name == "*": - database_names.add("system") # 系统表名使用的库名 - for index_name in indices.keys(): - if self.db_separator in index_name: - db_name = index_name.split(self.db_separator)[0] - database_names.add(db_name) - elif index_name.startswith(".kibana_"): - database_names.add("system_kibana") - elif index_name.startswith(".internal."): - database_names.add("system_internal") - database_names.add("other") # 表名没有__时,使用的库名 - database_names_sorted = sorted(database_names) - return ResultSet(rows=database_names_sorted) - except Exception as e: - logger.error(f"获取数据库时出错:{e}{traceback.format_exc()}") - raise Exception(f"获取数据库时出错: {str(e)}") - - def get_all_tables(self, db_name, **kwargs): - """根据给定的数据库名获取所有相关的表名""" - try: - self.get_connection() - indices = self.conn.indices.get_alias(index=self.db_name) - tables = set() - - db_mapping = { - "system_kibana": ".kibana_", - "system_internal": ".internal.", - "system": ".", - "other": "other", - } - # 根据分隔符分隔的库名 - if db_name not in db_mapping: - index_prefix = db_name.rstrip(self.db_separator) + self.db_separator - tables = [ - index for index in indices.keys() if index.startswith(index_prefix) - ] - else: - # 处理系统表,和other,循环db_mapping.items() 很难实现。 - for index_name in indices.keys(): - if index_name.startswith(".kibana_") | index_name.startswith( - ".kibana-" - ): - if db_name == "system_kibana": - tables.add(index_name) - continue - elif index_name.startswith(".internal."): - if db_name == "system_internal": - tables.add(index_name) - continue - elif index_name.startswith("."): - if db_name == "system": - tables.add(index_name) - continue - elif index_name.startswith(db_name): - tables.add(index_name) - continue - elif self.db_separator in index_name: - continue - else: - if db_name == "other": - tables.add(index_name) - tables_sorted = sorted(tables) - return ResultSet(rows=tables_sorted) - except Exception as e: - raise Exception(f"获取表列表时出错: {str(e)}") - - def get_all_columns_by_tb(self, db_name, tb_name, **kwargs): - """获取所有字段""" - result_set = ResultSet(full_sql=f"{tb_name}/_mapping") - try: - self.get_connection() - mapping = self.conn.indices.get_mapping(index=tb_name) - properties = ( - mapping.get(tb_name, {}).get("mappings", {}).get("properties", None) - ) - # 返回字段名 - result_set.column_list = ["column_name"] - if properties is None: - result_set.rows = [("无")] - else: - result_set.rows = list(properties.keys()) - return result_set - except Exception as e: - raise Exception(f"获取字段时出错: {str(e)}") - - def describe_table(self, db_name, tb_name, **kwargs): - """表结构""" - result_set = ResultSet(full_sql=f"{tb_name}/_mapping") - try: - self.get_connection() - mapping = self.conn.indices.get_mapping(index=tb_name) - properties = ( - mapping.get(tb_name, {}).get("mappings", {}).get("properties", None) - ) - # 创建包含字段名、类型和其他信息的列表结构 - result_set.column_list = ["column_name", "type", "fields"] - if properties is None: - result_set.rows = [("无", "无", "无")] - else: - result_set.rows = [ - ( - column, - details.get("type"), - json.dumps(details.get("fields", {})), - ) - for column, details in properties.items() - ] - return result_set - except Exception as e: - raise Exception(f"获取字段时出错: {str(e)}") - - def query_check(self, db_name=None, sql=""): - """语句检查""" - result = {"msg": "", "bad_query": False, "filtered_sql": sql, "has_star": False} - # 使用正则表达式去除开头的空白字符和换行符 - tripped_sql = re.sub(r"^\s+", "", sql) - result["filtered_sql"] = tripped_sql - lower_sql = tripped_sql.lower() - # 检查是否以 'get' 或 'select' 开头 - if lower_sql.startswith("get ") or lower_sql.startswith("select "): - result["msg"] = "语句检查通过。" - result["bad_query"] = False - else: - result["msg"] = ( - "语句检查失败:语句必须以 'get' 或 'select' 开头。示例查询:GET /dmp__iv/_search、select * from dmp__iv limit 10;" - ) - result["bad_query"] = True - return result - - def filter_sql(self, sql="", limit_num=0): - """过滤 SQL 语句""" - return sql.strip() - - def query( - self, - db_name=None, - sql="", - limit_num=0, - close_conn=True, - parameters=None, - **kwargs, - ): - """执行查询""" - try: - result_set = ResultSet(full_sql=sql) - - # 解析查询字符串 - query_params = self.parse_es_select_query_to_query_params(sql, limit_num) - self.get_connection() - # 管理查询处理 - if query_params.path.startswith("/_cat/indices"): - # v这个参数用显示标题,需要加上。 - if "v" not in query_params.params: - query_params.params["v"] = "true" - response = self.conn.cat.indices( - index=query_params.index, params=query_params.params - ) - response_data = self.parse_cat_indices_response(response.body) - # 如果有数据,设置列名 - if response_data: - result_set.column_list = list(response_data[0].keys()) - result_set.rows = [tuple(row.values()) for row in response_data] - else: - result_set.column_list = [] - result_set.rows = [] - result_set.affected_rows = 0 - else: - # 执行搜索查询 - response = self.conn.search( - index=query_params.index, - body=query_params.query_body, - params=query_params.params, - ) - - # 提取查询结果 - hits = response.get("hits", {}).get("hits", []) - # 处理查询结果,将列表和字典转换为 JSON 字符串 - rows = [] - for hit in hits: - # 获取文档 ID 和 _source 数据 - doc_id = hit.get("_id") - source_data = hit.get("_source", {}) - - # 转换需要转换为 JSON 字符串的字段 - for key, value in source_data.items(): - if isinstance(value, (list, dict)): # 如果字段是列表或字典 - source_data[key] = json.dumps(value) # 转换为 JSON 字符串 - - # 构建结果行 - row = {"_id": doc_id, **source_data} - rows.append(row) - - # 如果有结果,获取字段名作为列名 - if rows: - first_row = rows[0] - column_list = list(first_row.keys()) - else: - column_list = [] - - # 构建结果集 - result_set.rows = [tuple(row.values()) for row in rows] # 只获取值 - result_set.column_list = column_list - result_set.affected_rows = len(result_set.rows) - return result_set - except Exception as e: - raise Exception(f"执行查询时出错: {str(e)}") - - def parse_cat_indices_response(self, response_text): - """解析cat indices结果""" - # 将响应文本按行分割 - lines = response_text.strip().splitlines() - # 获取列标题 - headers = lines[0].strip().split() - # 解析每一行数据 - indices_info = [] - for line in lines[1:]: - # 按空格分割,并与标题进行配对 - values = line.strip().split(maxsplit=len(headers) - 1) - index_info = dict(zip(headers, values)) - indices_info.append(index_info) - return indices_info - - def parse_es_select_query_to_query_params( - self, search_query_str: str, limit_num: int - ) -> QueryParamsEs: - """解析 search query 字符串为 QueryParamsEs 对象""" - - # 解析查询字符串 - lines = search_query_str.splitlines() - method_line = lines[0].strip() - - query_body = "\n".join(lines[1:]).strip() - # 如果 query_body 为空,使用默认查询体 - if not query_body: - query_body = json.dumps({"query": {"match_all": {}}}) - - # 确保 query_body 是有效的 JSON - try: - json_body = json.loads(query_body) - except json.JSONDecodeError as json_err: - raise ValueError(f"query_body:{query_body} 无法转为Json格式。{json_err},") - - # 提取方法和路径 - method, path_with_params = method_line.split(maxsplit=1) - # 确保路径以 '/' 开头 - if not path_with_params.startswith("/"): - path_with_params = "/" + path_with_params - - # 分离路径和查询参数 - path, params_str = ( - path_with_params.split("?", 1) - if "?" in path_with_params - else (path_with_params, "") - ) - params = {} - if params_str: - for pair in params_str.split("&"): - if "=" in pair: - key, value = pair.split("=", 1) - else: - key = pair - value = "" - params[key] = value - index_pattern = "" - # 判断路径类型并提取索引模式 - if path.startswith("/_cat/indices"): - # _cat API 路径 - path_parts = path.split("/") - if len(path_parts) > 3: - index_pattern = path_parts[3] - if not index_pattern: - index_pattern = "*" - elif "/_search" in path: - # 默认情况,处理常规索引路径 - # 提取索引名称 - path_parts = path.split("/") - if len(path_parts) > 1: - index_pattern = path_parts[1] - - if not index_pattern: - raise Exception("未找到索引名称。") - - size = limit_num if limit_num > 0 else 100 - # 检查 JSON 中是否已经有 size,如果没有就设置 - if "size" not in json_body: - json_body["size"] = size - - # 构建 QueryParams 对象 - query_params = QueryParamsEs( - index=index_pattern, - path=path_with_params, - params=params, - method=method, - size=size, - query_body=json_body, - ) - - return query_params diff --git a/sql/engines/test_elasticsearch.py b/sql/engines/test_elasticsearch.py index fe6b7ca6cc..44bfda0df1 100644 --- a/sql/engines/test_elasticsearch.py +++ b/sql/engines/test_elasticsearch.py @@ -4,7 +4,7 @@ from elasticsearch import Elasticsearch from elasticsearch.exceptions import TransportError from sql.engines import ResultSet, ReviewSet -from sql.engines.elasticsearch import ElasticsearchEngine, QueryParamsEs +from sql.engines.elasticsearch import ElasticsearchEngine from sql.models import Instance From af41039dd47879c804d1f92a63ed7e4465ebf3a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 11:44:20 +0800 Subject: [PATCH 23/33] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elastic_search_engine_base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/engines/elastic_search_engine_base.py b/sql/engines/elastic_search_engine_base.py index f524c27a42..54f4919e24 100644 --- a/sql/engines/elastic_search_engine_base.py +++ b/sql/engines/elastic_search_engine_base.py @@ -219,11 +219,11 @@ def query( response = self.conn.cat.indices( index=query_params.index, params=query_params.params ) - response_body="" + response_body = "" if isinstance(response, str): - response_body=response + response_body = response else: - response_body=response.body + response_body = response.body response_data = self.parse_cat_indices_response(response_body) # 如果有数据,设置列名 if response_data: @@ -364,4 +364,4 @@ def parse_es_select_query_to_query_params( query_body=json_body, ) - return query_params \ No newline at end of file + return query_params From 3e81453d311bf0edb52c149e658a1d37eb3b9843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 11:46:53 +0800 Subject: [PATCH 24/33] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 216b7cdd7b..72895704ac 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -7,7 +7,7 @@ logger = logging.getLogger("default") - + class ElasticsearchEngine(ElasticsearchEngineBase): """Elasticsearch 引擎实现""" From dfbf9f173f17661dfa0b5cf97fd6080dded3b84d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 11:52:08 +0800 Subject: [PATCH 25/33] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 72895704ac..c42f9488da 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -13,7 +13,6 @@ class ElasticsearchEngine(ElasticsearchEngineBase): """Elasticsearch 引擎实现""" def __init__(self, instance=None): - # self.db_separator = "__" # 设置分隔符 super().__init__(instance=instance) name: str = "Elasticsearch" From 7252cdbb27b40920374bb99cbe9e16923ca60c95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 12:07:31 +0800 Subject: [PATCH 26/33] =?UTF-8?q?=E5=88=A0=E9=99=A4test=5Finfo=5Fproperty?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/test_elasticsearch.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/engines/test_elasticsearch.py b/sql/engines/test_elasticsearch.py index 44bfda0df1..025bc6277a 100644 --- a/sql/engines/test_elasticsearch.py +++ b/sql/engines/test_elasticsearch.py @@ -21,10 +21,6 @@ def setUp(self): # 初始化 ElasticsearchEngine,传入模拟的 instance self.engine = ElasticsearchEngine(instance=self.mock_instance) - def test_info_property(self): - # 测试 info 属性是否正确返回描述字符串 - self.assertEqual(self.engine.info, "Elasticsearch 引擎") - @patch("sql.engines.elasticsearch.Elasticsearch") def test_get_all_databases(self, mockElasticsearch): mock_conn = Mock() From 8da8730f6be3043b0d0d94c17aac56e98aa2adae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 13:38:30 +0800 Subject: [PATCH 27/33] =?UTF-8?q?=E7=B1=BB=E5=90=88=E5=B9=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elastic_search_engine_base.py | 367 ---------------------- sql/engines/elasticsearch.py | 366 ++++++++++++++++++++- 2 files changed, 364 insertions(+), 369 deletions(-) delete mode 100644 sql/engines/elastic_search_engine_base.py diff --git a/sql/engines/elastic_search_engine_base.py b/sql/engines/elastic_search_engine_base.py deleted file mode 100644 index 54f4919e24..0000000000 --- a/sql/engines/elastic_search_engine_base.py +++ /dev/null @@ -1,367 +0,0 @@ -# -*- coding: UTF-8 -*- -import os -import re, time -from click import command -import logging -import traceback -import simplejson as json -from . import EngineBase -from .models import ResultSet, ReviewSet, ReviewResult -from common.config import SysConfig - - -logger = logging.getLogger("default") - - -class QueryParamsSearch: - def __init__( - self, - index: str, - path: str, - params: str, - method: str, - size: int, - query_body: dict = None, - ): - self.index = index - self.path = path - self.params = params - self.method = method - self.size = size - # 确保 query_body 不为 None - self.query_body = query_body if query_body is not None else {} - - -class ElasticsearchEngineBase(EngineBase): - """Elasticsearch、OpenSearch等Search父类实现""" - - def __init__(self, instance=None): - self.db_separator = "__" # 设置分隔符 - # 限制只能2种支持的子类 - self.search_name = ["Elasticsearch", "OpenSearch"] - if self.name not in self.search_name: - raise ValueError( - f"Invalid name: {self.name}. Must be one of {self.search_name}." - ) - super().__init__(instance=instance) - - def get_connection(self, db_name=None): - """返回一个conn实例""" - - def test_connection(self): - """测试实例链接是否正常""" - return self.get_all_databases() - - name: str = "SearchBase" - info: str = "SearchBase 引擎" - - def get_all_databases(self): - """获取所有“数据库”名(从索引名提取),默认提取 __ 前的部分作为数据库名""" - try: - self.get_connection() - # 获取所有的别名,没有别名就是本身。 - indices = self.conn.indices.get_alias(index=self.db_name) - database_names = set() - if self.db_name == "*": - database_names.add("system") # 系统表名使用的库名 - for index_name in indices.keys(): - if self.db_separator in index_name: - db_name = index_name.split(self.db_separator)[0] - database_names.add(db_name) - elif index_name.startswith(".kibana_"): - database_names.add("system_kibana") - elif index_name.startswith(".internal."): - database_names.add("system_internal") - database_names.add("other") # 表名没有__时,使用的库名 - database_names_sorted = sorted(database_names) - return ResultSet(rows=database_names_sorted) - except Exception as e: - logger.error(f"获取数据库时出错:{e}{traceback.format_exc()}") - raise Exception(f"获取数据库时出错: {str(e)}") - - def get_all_tables(self, db_name, **kwargs): - """根据给定的数据库名获取所有相关的表名""" - try: - self.get_connection() - indices = self.conn.indices.get_alias(index=self.db_name) - tables = set() - - db_mapping = { - "system_kibana": ".kibana_", - "system_internal": ".internal.", - "system": ".", - "other": "other", - } - # 根据分隔符分隔的库名 - if db_name not in db_mapping: - index_prefix = db_name.rstrip(self.db_separator) + self.db_separator - tables = [ - index for index in indices.keys() if index.startswith(index_prefix) - ] - else: - # 处理系统表,和other,循环db_mapping.items() 很难实现。 - for index_name in indices.keys(): - if index_name.startswith(".kibana_") | index_name.startswith( - ".kibana-" - ): - if db_name == "system_kibana": - tables.add(index_name) - continue - elif index_name.startswith(".internal."): - if db_name == "system_internal": - tables.add(index_name) - continue - elif index_name.startswith("."): - if db_name == "system": - tables.add(index_name) - continue - elif index_name.startswith(db_name): - tables.add(index_name) - continue - elif self.db_separator in index_name: - continue - else: - if db_name == "other": - tables.add(index_name) - tables_sorted = sorted(tables) - return ResultSet(rows=tables_sorted) - except Exception as e: - raise Exception(f"获取表列表时出错: {str(e)}") - - def get_all_columns_by_tb(self, db_name, tb_name, **kwargs): - """获取所有字段""" - result_set = ResultSet(full_sql=f"{tb_name}/_mapping") - try: - self.get_connection() - mapping = self.conn.indices.get_mapping(index=tb_name) - properties = ( - mapping.get(tb_name, {}).get("mappings", {}).get("properties", None) - ) - # 返回字段名 - result_set.column_list = ["column_name"] - if properties is None: - result_set.rows = [("无")] - else: - result_set.rows = list(properties.keys()) - return result_set - except Exception as e: - raise Exception(f"获取字段时出错: {str(e)}") - - def describe_table(self, db_name, tb_name, **kwargs): - """表结构""" - result_set = ResultSet(full_sql=f"{tb_name}/_mapping") - try: - self.get_connection() - mapping = self.conn.indices.get_mapping(index=tb_name) - properties = ( - mapping.get(tb_name, {}).get("mappings", {}).get("properties", None) - ) - # 创建包含字段名、类型和其他信息的列表结构 - result_set.column_list = ["column_name", "type", "fields"] - if properties is None: - result_set.rows = [("无", "无", "无")] - else: - result_set.rows = [ - ( - column, - details.get("type"), - json.dumps(details.get("fields", {})), - ) - for column, details in properties.items() - ] - return result_set - except Exception as e: - raise Exception(f"获取字段时出错: {str(e)}") - - def query_check(self, db_name=None, sql=""): - """语句检查""" - result = {"msg": "", "bad_query": False, "filtered_sql": sql, "has_star": False} - # 使用正则表达式去除开头的空白字符和换行符 - tripped_sql = re.sub(r"^\s+", "", sql) - result["filtered_sql"] = tripped_sql - lower_sql = tripped_sql.lower() - # 检查是否以 'get' 或 'select' 开头 - if lower_sql.startswith("get ") or lower_sql.startswith("select "): - result["msg"] = "语句检查通过。" - result["bad_query"] = False - else: - result["msg"] = ( - "语句检查失败:语句必须以 'get' 或 'select' 开头。示例查询:GET /dmp__iv/_search、select * from dmp__iv limit 10;" - ) - result["bad_query"] = True - return result - - def filter_sql(self, sql="", limit_num=0): - """过滤 SQL 语句""" - return sql.strip() - - def query( - self, - db_name=None, - sql="", - limit_num=0, - close_conn=True, - parameters=None, - **kwargs, - ): - """执行查询""" - try: - result_set = ResultSet(full_sql=sql) - - # 解析查询字符串 - query_params = self.parse_es_select_query_to_query_params(sql, limit_num) - self.get_connection() - # 管理查询处理 - if query_params.path.startswith("/_cat/indices"): - # v这个参数用显示标题,需要加上。 - if "v" not in query_params.params: - query_params.params["v"] = "true" - response = self.conn.cat.indices( - index=query_params.index, params=query_params.params - ) - response_body = "" - if isinstance(response, str): - response_body = response - else: - response_body = response.body - response_data = self.parse_cat_indices_response(response_body) - # 如果有数据,设置列名 - if response_data: - result_set.column_list = list(response_data[0].keys()) - result_set.rows = [tuple(row.values()) for row in response_data] - else: - result_set.column_list = [] - result_set.rows = [] - result_set.affected_rows = 0 - else: - # 执行搜索查询 - response = self.conn.search( - index=query_params.index, - body=query_params.query_body, - params=query_params.params, - ) - - # 提取查询结果 - hits = response.get("hits", {}).get("hits", []) - # 处理查询结果,将列表和字典转换为 JSON 字符串 - rows = [] - for hit in hits: - # 获取文档 ID 和 _source 数据 - doc_id = hit.get("_id") - source_data = hit.get("_source", {}) - - # 转换需要转换为 JSON 字符串的字段 - for key, value in source_data.items(): - if isinstance(value, (list, dict)): # 如果字段是列表或字典 - source_data[key] = json.dumps(value) # 转换为 JSON 字符串 - - # 构建结果行 - row = {"_id": doc_id, **source_data} - rows.append(row) - - # 如果有结果,获取字段名作为列名 - if rows: - first_row = rows[0] - column_list = list(first_row.keys()) - else: - column_list = [] - - # 构建结果集 - result_set.rows = [tuple(row.values()) for row in rows] # 只获取值 - result_set.column_list = column_list - result_set.affected_rows = len(result_set.rows) - return result_set - except Exception as e: - raise Exception(f"执行查询时出错: {str(e)}") - - def parse_cat_indices_response(self, response_text): - """解析cat indices结果""" - # 将响应文本按行分割 - lines = response_text.strip().splitlines() - # 获取列标题 - headers = lines[0].strip().split() - # 解析每一行数据 - indices_info = [] - for line in lines[1:]: - # 按空格分割,并与标题进行配对 - values = line.strip().split(maxsplit=len(headers) - 1) - index_info = dict(zip(headers, values)) - indices_info.append(index_info) - return indices_info - - def parse_es_select_query_to_query_params( - self, search_query_str: str, limit_num: int - ) -> QueryParamsSearch: - """解析 search query 字符串为 QueryParamsSearch 对象""" - - # 解析查询字符串 - lines = search_query_str.splitlines() - method_line = lines[0].strip() - - query_body = "\n".join(lines[1:]).strip() - # 如果 query_body 为空,使用默认查询体 - if not query_body: - query_body = json.dumps({"query": {"match_all": {}}}) - - # 确保 query_body 是有效的 JSON - try: - json_body = json.loads(query_body) - except json.JSONDecodeError as json_err: - raise ValueError(f"query_body:{query_body} 无法转为Json格式。{json_err},") - - # 提取方法和路径 - method, path_with_params = method_line.split(maxsplit=1) - # 确保路径以 '/' 开头 - if not path_with_params.startswith("/"): - path_with_params = "/" + path_with_params - - # 分离路径和查询参数 - path, params_str = ( - path_with_params.split("?", 1) - if "?" in path_with_params - else (path_with_params, "") - ) - params = {} - if params_str: - for pair in params_str.split("&"): - if "=" in pair: - key, value = pair.split("=", 1) - else: - key = pair - value = "" - params[key] = value - index_pattern = "" - # 判断路径类型并提取索引模式 - if path.startswith("/_cat/indices"): - # _cat API 路径 - path_parts = path.split("/") - if len(path_parts) > 3: - index_pattern = path_parts[3] - if not index_pattern: - index_pattern = "*" - elif "/_search" in path: - # 默认情况,处理常规索引路径 - # 提取索引名称 - path_parts = path.split("/") - if len(path_parts) > 1: - index_pattern = path_parts[1] - - if not index_pattern: - raise Exception("未找到索引名称。") - - size = limit_num if limit_num > 0 else 100 - # 检查 JSON 中是否已经有 size,如果没有就设置 - if "size" not in json_body: - json_body["size"] = size - - # 构建 QueryParams 对象 - query_params = QueryParamsSearch( - index=index_pattern, - path=path_with_params, - params=params, - method=method, - size=size, - query_body=json_body, - ) - - return query_params diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index c42f9488da..1920123d3e 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -1,7 +1,15 @@ # -*- coding: UTF-8 -*- - +import os +import re, time +from click import command +import logging +import traceback +import simplejson as json +from . import EngineBase +from .models import ResultSet, ReviewSet, ReviewResult +from common.config import SysConfig import logging -from sql.engines.elastic_search_engine_base import ElasticsearchEngineBase + from elasticsearch import Elasticsearch from elasticsearch.exceptions import TransportError @@ -9,6 +17,360 @@ logger = logging.getLogger("default") +class QueryParamsSearch: + def __init__( + self, + index: str, + path: str, + params: str, + method: str, + size: int, + query_body: dict = None, + ): + self.index = index + self.path = path + self.params = params + self.method = method + self.size = size + # 确保 query_body 不为 None + self.query_body = query_body if query_body is not None else {} + + +class ElasticsearchEngineBase(EngineBase): + """Elasticsearch、OpenSearch等Search父类实现""" + + def __init__(self, instance=None): + self.db_separator = "__" # 设置分隔符 + # 限制只能2种支持的子类 + self.search_name = ["Elasticsearch", "OpenSearch"] + if self.name not in self.search_name: + raise ValueError( + f"Invalid name: {self.name}. Must be one of {self.search_name}." + ) + super().__init__(instance=instance) + + def get_connection(self, db_name=None): + """返回一个conn实例""" + + def test_connection(self): + """测试实例链接是否正常""" + return self.get_all_databases() + + name: str = "SearchBase" + info: str = "SearchBase 引擎" + + def get_all_databases(self): + """获取所有“数据库”名(从索引名提取),默认提取 __ 前的部分作为数据库名""" + try: + self.get_connection() + # 获取所有的别名,没有别名就是本身。 + indices = self.conn.indices.get_alias(index=self.db_name) + database_names = set() + if self.db_name == "*": + database_names.add("system") # 系统表名使用的库名 + for index_name in indices.keys(): + if self.db_separator in index_name: + db_name = index_name.split(self.db_separator)[0] + database_names.add(db_name) + elif index_name.startswith(".kibana_"): + database_names.add("system_kibana") + elif index_name.startswith(".internal."): + database_names.add("system_internal") + database_names.add("other") # 表名没有__时,使用的库名 + database_names_sorted = sorted(database_names) + return ResultSet(rows=database_names_sorted) + except Exception as e: + logger.error(f"获取数据库时出错:{e}{traceback.format_exc()}") + raise Exception(f"获取数据库时出错: {str(e)}") + + def get_all_tables(self, db_name, **kwargs): + """根据给定的数据库名获取所有相关的表名""" + try: + self.get_connection() + indices = self.conn.indices.get_alias(index=self.db_name) + tables = set() + + db_mapping = { + "system_kibana": ".kibana_", + "system_internal": ".internal.", + "system": ".", + "other": "other", + } + # 根据分隔符分隔的库名 + if db_name not in db_mapping: + index_prefix = db_name.rstrip(self.db_separator) + self.db_separator + tables = [ + index for index in indices.keys() if index.startswith(index_prefix) + ] + else: + # 处理系统表,和other,循环db_mapping.items() 很难实现。 + for index_name in indices.keys(): + if index_name.startswith(".kibana_") | index_name.startswith( + ".kibana-" + ): + if db_name == "system_kibana": + tables.add(index_name) + continue + elif index_name.startswith(".internal."): + if db_name == "system_internal": + tables.add(index_name) + continue + elif index_name.startswith("."): + if db_name == "system": + tables.add(index_name) + continue + elif index_name.startswith(db_name): + tables.add(index_name) + continue + elif self.db_separator in index_name: + continue + else: + if db_name == "other": + tables.add(index_name) + tables_sorted = sorted(tables) + return ResultSet(rows=tables_sorted) + except Exception as e: + raise Exception(f"获取表列表时出错: {str(e)}") + + def get_all_columns_by_tb(self, db_name, tb_name, **kwargs): + """获取所有字段""" + result_set = ResultSet(full_sql=f"{tb_name}/_mapping") + try: + self.get_connection() + mapping = self.conn.indices.get_mapping(index=tb_name) + properties = ( + mapping.get(tb_name, {}).get("mappings", {}).get("properties", None) + ) + # 返回字段名 + result_set.column_list = ["column_name"] + if properties is None: + result_set.rows = [("无")] + else: + result_set.rows = list(properties.keys()) + return result_set + except Exception as e: + raise Exception(f"获取字段时出错: {str(e)}") + + def describe_table(self, db_name, tb_name, **kwargs): + """表结构""" + result_set = ResultSet(full_sql=f"{tb_name}/_mapping") + try: + self.get_connection() + mapping = self.conn.indices.get_mapping(index=tb_name) + properties = ( + mapping.get(tb_name, {}).get("mappings", {}).get("properties", None) + ) + # 创建包含字段名、类型和其他信息的列表结构 + result_set.column_list = ["column_name", "type", "fields"] + if properties is None: + result_set.rows = [("无", "无", "无")] + else: + result_set.rows = [ + ( + column, + details.get("type"), + json.dumps(details.get("fields", {})), + ) + for column, details in properties.items() + ] + return result_set + except Exception as e: + raise Exception(f"获取字段时出错: {str(e)}") + + def query_check(self, db_name=None, sql=""): + """语句检查""" + result = {"msg": "", "bad_query": False, "filtered_sql": sql, "has_star": False} + # 使用正则表达式去除开头的空白字符和换行符 + tripped_sql = re.sub(r"^\s+", "", sql) + result["filtered_sql"] = tripped_sql + lower_sql = tripped_sql.lower() + # 检查是否以 'get' 或 'select' 开头 + if lower_sql.startswith("get ") or lower_sql.startswith("select "): + result["msg"] = "语句检查通过。" + result["bad_query"] = False + else: + result["msg"] = ( + "语句检查失败:语句必须以 'get' 或 'select' 开头。示例查询:GET /dmp__iv/_search、select * from dmp__iv limit 10;" + ) + result["bad_query"] = True + return result + + def filter_sql(self, sql="", limit_num=0): + """过滤 SQL 语句""" + return sql.strip() + + def query( + self, + db_name=None, + sql="", + limit_num=0, + close_conn=True, + parameters=None, + **kwargs, + ): + """执行查询""" + try: + result_set = ResultSet(full_sql=sql) + + # 解析查询字符串 + query_params = self.parse_es_select_query_to_query_params(sql, limit_num) + self.get_connection() + # 管理查询处理 + if query_params.path.startswith("/_cat/indices"): + # v这个参数用显示标题,需要加上。 + if "v" not in query_params.params: + query_params.params["v"] = "true" + response = self.conn.cat.indices( + index=query_params.index, params=query_params.params + ) + response_body = "" + if isinstance(response, str): + response_body = response + else: + response_body = response.body + response_data = self.parse_cat_indices_response(response_body) + # 如果有数据,设置列名 + if response_data: + result_set.column_list = list(response_data[0].keys()) + result_set.rows = [tuple(row.values()) for row in response_data] + else: + result_set.column_list = [] + result_set.rows = [] + result_set.affected_rows = 0 + else: + # 执行搜索查询 + response = self.conn.search( + index=query_params.index, + body=query_params.query_body, + params=query_params.params, + ) + + # 提取查询结果 + hits = response.get("hits", {}).get("hits", []) + # 处理查询结果,将列表和字典转换为 JSON 字符串 + rows = [] + for hit in hits: + # 获取文档 ID 和 _source 数据 + doc_id = hit.get("_id") + source_data = hit.get("_source", {}) + + # 转换需要转换为 JSON 字符串的字段 + for key, value in source_data.items(): + if isinstance(value, (list, dict)): # 如果字段是列表或字典 + source_data[key] = json.dumps(value) # 转换为 JSON 字符串 + + # 构建结果行 + row = {"_id": doc_id, **source_data} + rows.append(row) + + # 如果有结果,获取字段名作为列名 + if rows: + first_row = rows[0] + column_list = list(first_row.keys()) + else: + column_list = [] + + # 构建结果集 + result_set.rows = [tuple(row.values()) for row in rows] # 只获取值 + result_set.column_list = column_list + result_set.affected_rows = len(result_set.rows) + return result_set + except Exception as e: + raise Exception(f"执行查询时出错: {str(e)}") + + def parse_cat_indices_response(self, response_text): + """解析cat indices结果""" + # 将响应文本按行分割 + lines = response_text.strip().splitlines() + # 获取列标题 + headers = lines[0].strip().split() + # 解析每一行数据 + indices_info = [] + for line in lines[1:]: + # 按空格分割,并与标题进行配对 + values = line.strip().split(maxsplit=len(headers) - 1) + index_info = dict(zip(headers, values)) + indices_info.append(index_info) + return indices_info + + def parse_es_select_query_to_query_params( + self, search_query_str: str, limit_num: int + ) -> QueryParamsSearch: + """解析 search query 字符串为 QueryParamsSearch 对象""" + + # 解析查询字符串 + lines = search_query_str.splitlines() + method_line = lines[0].strip() + + query_body = "\n".join(lines[1:]).strip() + # 如果 query_body 为空,使用默认查询体 + if not query_body: + query_body = json.dumps({"query": {"match_all": {}}}) + + # 确保 query_body 是有效的 JSON + try: + json_body = json.loads(query_body) + except json.JSONDecodeError as json_err: + raise ValueError(f"query_body:{query_body} 无法转为Json格式。{json_err},") + + # 提取方法和路径 + method, path_with_params = method_line.split(maxsplit=1) + # 确保路径以 '/' 开头 + if not path_with_params.startswith("/"): + path_with_params = "/" + path_with_params + + # 分离路径和查询参数 + path, params_str = ( + path_with_params.split("?", 1) + if "?" in path_with_params + else (path_with_params, "") + ) + params = {} + if params_str: + for pair in params_str.split("&"): + if "=" in pair: + key, value = pair.split("=", 1) + else: + key = pair + value = "" + params[key] = value + index_pattern = "" + # 判断路径类型并提取索引模式 + if path.startswith("/_cat/indices"): + # _cat API 路径 + path_parts = path.split("/") + if len(path_parts) > 3: + index_pattern = path_parts[3] + if not index_pattern: + index_pattern = "*" + elif "/_search" in path: + # 默认情况,处理常规索引路径 + # 提取索引名称 + path_parts = path.split("/") + if len(path_parts) > 1: + index_pattern = path_parts[1] + + if not index_pattern: + raise Exception("未找到索引名称。") + + size = limit_num if limit_num > 0 else 100 + # 检查 JSON 中是否已经有 size,如果没有就设置 + if "size" not in json_body: + json_body["size"] = size + + # 构建 QueryParams 对象 + query_params = QueryParamsSearch( + index=index_pattern, + path=path_with_params, + params=params, + method=method, + size=size, + query_body=json_body, + ) + + return query_params + + class ElasticsearchEngine(ElasticsearchEngineBase): """Elasticsearch 引擎实现""" From e13e93fd71a116189fa3825b1e79010e77cbe162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=99=93=E9=A3=9E?= Date: Tue, 13 Aug 2024 17:20:57 +0800 Subject: [PATCH 28/33] Update requirements.txt Co-authored-by: Leo Q --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 9f1a363b24..e5e424fae6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -44,4 +44,4 @@ django-cas-ng==4.3.0 cassandra-driver httpx OpenAI -elasticsearch==8.14.0 \ No newline at end of file +elasticsearch==8.14.0 From e5eb8dc244c11225f0c63a247d837499ff24e801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 17:21:10 +0800 Subject: [PATCH 29/33] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=97=A0=E7=94=A8?= =?UTF-8?q?=E5=BC=95=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 1920123d3e..8f04308159 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -1,7 +1,4 @@ # -*- coding: UTF-8 -*- -import os -import re, time -from click import command import logging import traceback import simplejson as json From 2691ac51992110fecfdde7cd85e34564f3637214 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 17:33:29 +0800 Subject: [PATCH 30/33] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=97=A0=E7=94=A8?= =?UTF-8?q?=E7=9A=84=E5=BC=95=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/elasticsearch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index 8f04308159..7a08cf40f5 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -1,5 +1,6 @@ # -*- coding: UTF-8 -*- import logging +import re import traceback import simplejson as json from . import EngineBase From 7d08741668f6024800c3b34662a3916afab1cb58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 17:57:51 +0800 Subject: [PATCH 31/33] elasticsearch==8.14.0 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index e5e424fae6..9f1a363b24 100644 --- a/requirements.txt +++ b/requirements.txt @@ -44,4 +44,4 @@ django-cas-ng==4.3.0 cassandra-driver httpx OpenAI -elasticsearch==8.14.0 +elasticsearch==8.14.0 \ No newline at end of file From 42d751b61d34237ca405ae6a319a293b5d6f20dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 17:58:37 +0800 Subject: [PATCH 32/33] elasticsearch==8.14.0 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 9f1a363b24..e5e424fae6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -44,4 +44,4 @@ django-cas-ng==4.3.0 cassandra-driver httpx OpenAI -elasticsearch==8.14.0 \ No newline at end of file +elasticsearch==8.14.0 From d2ed4de46bcf9e2d2ed328d11332bb97494e1f5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E9=A3=9E?= Date: Tue, 13 Aug 2024 18:32:51 +0800 Subject: [PATCH 33/33] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=96=B0=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index e5e424fae6..9555e008b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,3 +45,4 @@ cassandra-driver httpx OpenAI elasticsearch==8.14.0 +