diff --git a/cobra/__version__.py b/cobra/__version__.py index 70ae6e63..6ebeb431 100644 --- a/cobra/__version__.py +++ b/cobra/__version__.py @@ -7,7 +7,7 @@ __issue_page__ = 'https://github.com/LoRexxar/Cobra-W/issues/new' __python_version__ = sys.version.split()[0] __platform__ = platform.platform() -__version__ = '1.2.0' +__version__ = '1.3.0' __author__ = 'LoRexxar' __author_email__ = 'LoRexxar@gmail.com' __license__ = 'MIT License' diff --git a/cobra/cast.py b/cobra/cast.py index fbdfd2ae..2b76985b 100644 --- a/cobra/cast.py +++ b/cobra/cast.py @@ -241,17 +241,17 @@ def is_controllable_param(self): logger.debug("[Deep AST] Start AST for param {param_name}".format(param_name=param_name)) - _is_co, _cp, expr_lineno = anlysis_params(param_name, self.file_path, self.line, self.sr.vul_function, self.repair_functions) + _is_co, _cp, expr_lineno, chain = anlysis_params(param_name, self.file_path, self.line, self.sr.vul_function, self.repair_functions, isexternal=True) if _is_co == 1: logger.debug("[AST] Is assign string: `Yes`") - return True, _cp + return True, _is_co, _cp, chain elif _is_co == 3: - logger.info("[AST] can't find this param, something error..") - continue + logger.info("[AST] can't find this param, Unconfirmed vulnerable..") + return True, _is_co, _cp, chain elif _is_co == 4: logger.info("[AST] New vul function {}()".format(_cp[0].name)) - return False, tuple([_is_co, _cp]) + return False, _is_co, tuple([_is_co, _cp]), chain else: continue diff --git a/cobra/engine.py b/cobra/engine.py index 92bc4914..d4f14051 100644 --- a/cobra/engine.py +++ b/cobra/engine.py @@ -188,9 +188,11 @@ def store(result): # print data = [] + data2 = [] table = PrettyTable( ['#', 'CVI', 'Rule(ID/Name)', 'Lang/CVE-id', 'Target-File:Line-Number', 'Commit(Author)', 'Source Code Content', 'Analysis']) + table.align = 'l' trigger_rules = [] for idx, x in enumerate(find_vulnerabilities): @@ -201,11 +203,20 @@ def store(result): except AttributeError as e: code_content = x.code_content.decode('utf-8')[:100].strip() row = [idx + 1, x.id, x.rule_name, x.language, trigger, commit, code_content, x.analysis] + row2 = [idx+1, x.chain] + data.append(row) + data2.append(row2) + table.add_row(row) + if x.id not in trigger_rules: logger.debug(' > trigger rule (CVI-{cvi})'.format(cvi=x.id)) trigger_rules.append(x.id) + + # clear + x.chain = "" + diff_rules = list(set(push_rules) - set(trigger_rules)) vn = len(find_vulnerabilities) if vn == 0: @@ -214,9 +225,20 @@ def store(result): logger.info("[SCAN] Trigger Rules: {tr} Vulnerabilities ({vn})\r\n{table}".format(tr=len(trigger_rules), vn=len(find_vulnerabilities), table=table)) + + # 输出chain for all + logger.info("[SCAN] Vulnerabilities Chain list: ") + for d in data2: + logger.info("[SCAN] Vul {}".format(d[0])) + for c in d[1]: + logger.info("[Chain] {}".format(c)) + + logger.info("[SCAN] ending\r\n -------------------------------------------------------------------------") + if len(diff_rules) > 0: logger.info( '[SCAN] Not Trigger Rules ({l}): {r}'.format(l=len(diff_rules), r=','.join(diff_rules))) + # completed running data if s_sid is not None: Running(s_sid).data({ @@ -399,6 +421,12 @@ def process(self): if len(datas) == 3: is_vulnerability, reason, data = datas + + if "New Core" not in reason: + code = "Code: {}".format(origin_vulnerability[2].strip(" ")) + file_path = os.path.normpath(origin_vulnerability[0]) + data.insert(1, ("NewScan", code, origin_vulnerability[0], origin_vulnerability[1])) + elif len(datas) == 2: is_vulnerability, reason = datas else: @@ -407,6 +435,7 @@ def process(self): if is_vulnerability: logger.debug('[CVI-{cvi}] [RET] Found {code}'.format(cvi=self.sr.svid, code=reason)) vulnerability.analysis = reason + vulnerability.chain = data self.rule_vulnerabilities.append(vulnerability) else: if reason == 'New Core': # 新的规则 @@ -688,15 +717,18 @@ def scan(self): logger.debug('[AST] [RET] {c}'.format(c=result)) if len(result) > 0: if result[0]['code'] == 1: # 函数参数可控 - return True, 'Function-param-controllable' + return True, 'Function-param-controllable', result[0]['chain'] + + elif result[0]['code'] == 2: # 漏洞修复 + return False, 'Function-param-controllable but fixed', result[0]['chain'] - if result[0]['code'] == 2: # 漏洞修复 - return False, 'Function-param-controllable but fixed' + elif result[0]['code'] == 3: # 疑似漏洞 + return True, 'Unconfirmed Function-param-controllable', result[0]['chain'] - if result[0]['code'] == -1: # 函数参数不可控 - return False, 'Function-param-uncon' + elif result[0]['code'] == -1: # 函数参数不可控 + return False, 'Function-param-uncon', result[0]['chain'] - if result[0]['code'] == 4: # 新规则生成 + elif result[0]['code'] == 4: # 新规则生成 return False, 'New Core', result[0]['source'] logger.debug('[AST] [CODE] {code}'.format(code=result[0]['code'])) @@ -711,18 +743,15 @@ def scan(self): raise # vustomize-match - param_is_controllable, data = ast.is_controllable_param() + param_is_controllable, code, data, chain = ast.is_controllable_param() if param_is_controllable: logger.debug('[CVI-{cvi}] [PARAM-CONTROLLABLE] Param is controllable'.format(cvi=self.cvi)) - # Repair - # is_repair, data = ast.match(self.rule_repair, self.repair_block) - # if is_repair: - # # fixed - # logger.debug('[CVI-{cvi}] [REPAIR] Vulnerability Fixed'.format(cvi=self.cvi)) - # return False, 'Vulnerability-Fixed(漏洞已修复)' - # else: - # logger.debug('[CVI-{cvi}] [REPAIR] [RET] Not fixed'.format(cvi=self.cvi)) - return True, 'Vustomize-Match' + + if code == 1: + return True, 'Vustomize-Match', chain + elif code ==3: + return False, 'Unconfirmed Vustomize-Match', chain + else: if type(data) is tuple: if int(data[0]) == 4: @@ -760,7 +789,6 @@ def scan(self): return False, 'Exception' - def init_match_rule(data): """ 处理新生成规则初始化正则匹配 @@ -946,8 +974,14 @@ def NewCore(old_single_rule, target_directory, new_rules, files, count=0, secret datas = Core(target_directory, vulnerability, sr, 'project name', ['whitelist1', 'whitelist2'], files=files, secret_name=secret_name).scan() data = "" + if len(datas) == 3: is_vulnerability, reason, data = datas + + if "New Core" not in reason: + code = "Code: {}".format(origin_vulnerability[2]) + data.insert(1, ("NewScan", code, origin_vulnerability[0], origin_vulnerability[1])) + elif len(datas) == 2: is_vulnerability, reason = datas else: @@ -956,6 +990,7 @@ def NewCore(old_single_rule, target_directory, new_rules, files, count=0, secret if is_vulnerability: logger.debug('[CVI-{cvi}] [RET] Found {code}'.format(cvi="00000", code=reason)) vulnerability.analysis = reason + vulnerability.chain = data rule_vulnerabilities.append(vulnerability) else: if reason == 'New Core': # 新的规则 diff --git a/cobra/parser.py b/cobra/parser.py index db45b38d..499afb39 100644 --- a/cobra/parser.py +++ b/cobra/parser.py @@ -17,12 +17,14 @@ from .log import logger from .pretreatment import ast_object import re +import os import codecs import traceback with_line = True scan_results = [] # 结果存放列表初始化 is_repair_functions = [] # 修复函数初始化 +scan_chain = [] # 回溯链变量 def export(items): @@ -259,7 +261,7 @@ def is_repair(expr): is_re = False # 是否修复,默认值是未修复 global is_repair_functions if expr in is_repair_functions: - logger.debug("[AST] function {} in is_repair_functions, The vulnerability does not exist ") + logger.debug("[AST] function {} in is_repair_functions, The vulnerability does not exist ".format(expr)) is_re = True return is_re @@ -360,9 +362,11 @@ def is_controllable(expr, flag=None): # 获取表达式中的变量,看是否 # return is_co, cp, expr_lineno -def function_back(param, nodes, function_params, vul_function=None): # 回溯函数定义位置 +def function_back(param, nodes, function_params, vul_function=None, file_path=None, isback=None): # 回溯函数定义位置 """ 递归回溯函数定义位置,传入param类型不同 + :param isback: + :param file_path: :param function_params: :param vul_function: :param param: @@ -386,14 +390,17 @@ def function_back(param, nodes, function_params, vul_function=None): # 回溯 return_node = function_node.node return_param = return_node.node is_co, cp, expr_lineno = parameters_back(return_param, function_nodes, function_params, - vul_function=vul_function) + vul_function=vul_function, file_path=file_path, + isback=isback) return is_co, cp, expr_lineno -def array_back(param, nodes, vul_function=None): # 回溯数组定义赋值 +def array_back(param, nodes, vul_function=None, file_path=None, isback=None): # 回溯数组定义赋值 """ 递归回溯数组赋值定义 + :param isback: + :param file_path: :param vul_function: :param param: :param nodes: @@ -421,31 +428,37 @@ def array_back(param, nodes, vul_function=None): # 回溯数组定义赋值 is_co, cp = is_controllable(p_node.value.node.name) if is_co != 1: - is_co, cp, expr_lineno = array_back(param, nodes) + is_co, cp, expr_lineno = array_back(param, nodes, file_path=file_path, + isback=isback) else: n_node = php.Variable(p_node.value) - is_co, cp, expr_lineno = parameters_back(n_node, nodes, vul_function=vul_function) + is_co, cp, expr_lineno = parameters_back(n_node, nodes, vul_function=vul_function, file_path=file_path, + isback=isback) if param == param_node: # 处理数组一次性赋值,左值为数组 if isinstance(param_node_expr, php.ArrayOffset): # 如果赋值值仍然是数组,先经过判断在进入递归 is_co, cp = is_controllable(param_node_expr.node.name) if is_co != 1: - is_co, cp, expr_lineno = array_back(param, nodes) + is_co, cp, expr_lineno = array_back(param, nodes, file_path=file_path, + isback=isback) else: is_co, cp = is_controllable(param_node_expr) if is_co != 1 and is_co != -1: n_node = php.Variable(param_node_expr.node.value) - is_co, cp, expr_lineno = parameters_back(n_node, nodes, vul_function=vul_function) + is_co, cp, expr_lineno = parameters_back(n_node, nodes, vul_function=vul_function, file_path=file_path, + isback=isback) return is_co, cp, expr_lineno -def class_back(param, node, lineno, vul_function=None): +def class_back(param, node, lineno, vul_function=None, file_path=None, isback=None): """ 回溯类中变量 + :param isback: + :param file_path: :param vul_function: :param param: :param node: @@ -460,7 +473,8 @@ def class_back(param, node, lineno, vul_function=None): if class_node.lineno < int(lineno): vul_nodes.append(class_node) - is_co, cp, expr_lineno = parameters_back(param, vul_nodes, lineno=lineno, vul_function=vul_function) + is_co, cp, expr_lineno = parameters_back(param, vul_nodes, lineno=lineno, vul_function=vul_function, file_path=file_path, + isback=isback) if is_co == 1 or is_co == -1: # 可控或者不可控,直接返回 return is_co, cp, expr_lineno @@ -472,7 +486,8 @@ def class_back(param, node, lineno, vul_function=None): # 递归析构函数 is_co, cp, expr_lineno = parameters_back(param, constructs_nodes, function_params=class_node_params, - lineno=lineno, vul_function=vul_function) + lineno=lineno, vul_function=vul_function, file_path=file_path, + isback=isback) if is_co == 3: # 回溯输入参数 @@ -489,9 +504,11 @@ def class_back(param, node, lineno, vul_function=None): return is_co, cp, expr_lineno -def new_class_back(param, nodes, vul_function=None): +def new_class_back(param, nodes, vul_function=None, file_path=None, isback=None): """ 分析新建的class,自动进入tostring函数 + :param isback: + :param file_path: :param vul_function: :param param: :param nodes: @@ -518,7 +535,8 @@ def new_class_back(param, nodes, vul_function=None): if isinstance(tostring_node, php.Return): return_param = tostring_node.node is_co, cp, expr_lineno = parameters_back(return_param, tostring_nodes, - vul_function=vul_function) + vul_function=vul_function, file_path=file_path, + isback=isback) return is_co, cp, expr_lineno else: @@ -529,9 +547,10 @@ def new_class_back(param, nodes, vul_function=None): def parameters_back(param, nodes, function_params=None, lineno=0, - function_flag=0, vul_function=None, isback=None): # 用来得到回溯过程中的被赋值的变量是否与敏感函数变量相等,param是当前需要跟踪的污点 + function_flag=0, vul_function=None, file_path=None, isback=None): # 用来得到回溯过程中的被赋值的变量是否与敏感函数变量相等,param是当前需要跟踪的污点 """ 递归回溯敏感函数的赋值流程,param为跟踪的污点,当找到param来源时-->分析复制表达式-->获取新污点;否则递归下一个节点 + :param file_path: :param vul_function: :param param: :param nodes: @@ -541,20 +560,22 @@ def parameters_back(param, nodes, function_params=None, lineno=0, :param isback: 是否需要返回该值 :return: """ + global scan_chain if isinstance(param, php.FunctionCall) or isinstance(param, php.MethodCall): # 当污点为寻找函数时,递归进入寻找函数 logger.debug("[AST] AST analysis for FunctionCall or MethodCall {} in line {}".format(param.name, param.lineno)) - is_co, cp, expr_lineno = function_back(param, nodes, function_params) + is_co, cp, expr_lineno = function_back(param, nodes, function_params, file_path=file_path, isback=isback) return is_co, cp, expr_lineno if isinstance(param, php.ArrayOffset): # 当污点为数组时,递归进入寻找数组声明或赋值 logger.debug("[AST] AST analysis for ArrayOffset in line {}".format(param.lineno)) - is_co, cp, expr_lineno = array_back(param, nodes) + is_co, cp, expr_lineno = array_back(param, nodes, file_path=file_path, isback=isback) return is_co, cp, expr_lineno if isinstance(param, php.New) or (hasattr(param, "name") and isinstance(param.name, php.New)): # 当污点为新建类事,进入类中tostring函数分析 logger.debug("[AST] AST analysis for New Class {} in line {}".format(param.name, param.lineno)) - is_co, cp, expr_lineno = new_class_back(param, nodes) + is_co, cp, expr_lineno = new_class_back(param, nodes, file_path=file_path, + isback=isback) return is_co, cp, expr_lineno expr_lineno = 0 # source所在行号 @@ -566,10 +587,10 @@ def parameters_back(param, nodes, function_params=None, lineno=0, is_co, cp = is_controllable(param_name) - if len(nodes) != 0: + if len(nodes) != 0 and is_co != 1: node = nodes[len(nodes) - 1] - if isinstance(node, php.Assignment): # 回溯的过程中,对出现赋值情况的节点进行跟踪 + if isinstance(node, php.Assignment) and param_name == get_node_name(node.node): # 回溯的过程中,对出现赋值情况的节点进行跟踪 param_node = get_node_name(node.node) # param_node为被赋值的变量 param_expr, expr_lineno, is_re = get_expr_name(node.expr) # param_expr为赋值表达式,param_expr为变量或者列表 @@ -582,15 +603,22 @@ def parameters_back(param, nodes, function_params=None, lineno=0, logger.debug( "[AST] Find {}={} in line {}, start ast for param {}".format(param_name, param_expr, expr_lineno, param_expr)) + + file_path = os.path.normpath(file_path) + code = "{}={}".format(param_name, param_expr) + scan_chain.append(('Assignment', code, file_path, node.lineno)) + is_co, cp = is_controllable(param_expr) # 开始判断变量是否可控 - if is_co == -1 and isback is True: - cp = param_expr + if is_co == 1: return is_co, cp, expr_lineno if is_co != 1 and is_co != 3: is_co, cp = is_sink_function(param_expr, function_params) + if is_co == -1 and isback is True: + cp = param_expr + if isinstance(node.expr, php.ArrayOffset): param = node.expr else: @@ -605,6 +633,9 @@ def parameters_back(param, nodes, function_params=None, lineno=0, function_name, node.lineno, function_name)) + file_path = os.path.normpath(file_path) + code = "{}={}".format(param_name, param_expr) + scan_chain.append(('FunctionCall', code, file_path, node.lineno)) for node in nodes[::-1]: if isinstance(node, php.Function): @@ -618,7 +649,8 @@ def parameters_back(param, nodes, function_params=None, lineno=0, return_param = return_node.node is_co, cp, expr_lineno = parameters_back(return_param, function_nodes, function_params, lineno, function_flag=1, - vul_function=vul_function, isback=isback) + vul_function=vul_function, file_path=file_path, + isback=isback) if param_name == param_node and isinstance(param_expr, list): logger.debug( @@ -626,6 +658,10 @@ def parameters_back(param, nodes, function_params=None, lineno=0, param_expr, node.lineno, param_expr)) + file_path = os.path.normpath(file_path) + code = "{}={}".format(param_name, param_expr) + scan_chain.append(('ListAssignment', code, file_path, node.lineno)) + for expr in param_expr: param = expr is_co, cp = is_controllable(expr) @@ -635,7 +671,8 @@ def parameters_back(param, nodes, function_params=None, lineno=0, param = php.Variable(param) _is_co, _cp, expr_lineno = parameters_back(param, nodes[:-1], function_params, lineno, - function_flag=1, vul_function=vul_function, isback=isback) + function_flag=1, vul_function=vul_function, file_path=file_path, + isback=isback) if _is_co != -1: # 当参数可控时,值赋给is_co 和 cp,有一个参数可控,则认定这个函数可能可控 is_co = _is_co @@ -647,19 +684,33 @@ def parameters_back(param, nodes, function_params=None, lineno=0, function_params = node.params vul_nodes = [] + # 如果仅仅是函数定义,如果上一次赋值语句不在函数内,那么不应进去函数里分析,应该直接跳过这部分 + # test1 尝试使用行数叠加的方式 + if len(function_nodes) + function_lineno < int(lineno): + is_co, cp, expr_lineno = parameters_back(param, nodes[:-1], function_params, lineno, + function_flag=0, vul_function=vul_function, + file_path=file_path, + isback=isback) + return is_co, cp, expr_lineno + logger.debug( "[AST] param {} line {} in function {} line {}, start ast in function".format(param_name, node.lineno, node.name, function_lineno)) + file_path = os.path.normpath(file_path) + code = "param {} in function {}".format(param_name, node.name) + scan_chain.append(('Function', code, file_path, node.lineno)) + for function_node in function_nodes: if function_node is not None and int(function_lineno) <= function_node.lineno < int(lineno): vul_nodes.append(function_node) if len(vul_nodes) > 0: is_co, cp, expr_lineno = parameters_back(param, function_nodes, function_params, function_lineno, - function_flag=1, vul_function=vul_function, isback=isback) + function_flag=1, vul_function=vul_function, file_path=file_path, + isback=isback) if is_co == 3: # 出现新的敏感函数,重新生成新的漏洞结构,进入新的遍历结构 for node_param in node.params: @@ -668,6 +719,10 @@ def parameters_back(param, nodes, function_params=None, lineno=0, "[AST] param {} line {} in function_params, start new rule for function {}".format( param_name, node.lineno, node.name)) + file_path = os.path.normpath(file_path) + code = "param {} from NewFunction {}".format(param_name, node.name) + scan_chain.append(('NewFunction', code, file_path, node.lineno)) + if vul_function is None or node.name != vul_function: logger.info( "[Deep AST] Now vulnerability function from function {}() param {}".format(node.name, @@ -685,7 +740,7 @@ def parameters_back(param, nodes, function_params=None, lineno=0, return is_co, cp, 0 elif isinstance(node, php.Class): - is_co, cp, expr_lineno = class_back(param, node, lineno, vul_function=vul_function) + is_co, cp, expr_lineno = class_back(param, node, lineno, vul_function=vul_function, file_path=file_path, isback=isback) return is_co, cp, expr_lineno elif isinstance(node, php.If): @@ -704,11 +759,13 @@ def parameters_back(param, nodes, function_params=None, lineno=0, # 进入分析if内的代码块,如果返回参数不同于进入参数,那么在不同的代码块中,变量值不同,不能统一处理,需要递归进入不同的部分 is_co, cp, expr_lineno = parameters_back(param, if_nodes, function_params, if_node_lineno, - function_flag=1, vul_function=vul_function, isback=isback) + function_flag=1, vul_function=vul_function, + file_path=file_path, isback=isback) if is_co == 3 and cp != param: # 理由如上 is_co, cp, expr_lineno = parameters_back(param, nodes[:-1], function_params, lineno, - function_flag=1, vul_function=vul_function, isback=isback) # 找到可控的输入时,停止递归 + function_flag=1, vul_function=vul_function, + file_path=file_path, isback=isback) # 找到可控的输入时,停止递归 return is_co, cp, expr_lineno if is_co is not 1 and node.elseifs != []: # elseif可能有多个,所以需要列表 @@ -725,12 +782,13 @@ def parameters_back(param, nodes, function_params=None, lineno=0, elif_node_lineno = 0 is_co, cp, expr_lineno = parameters_back(param, elif_nodes, function_params, elif_node_lineno, - function_flag=1, vul_function=vul_function, isback=isback) + function_flag=1, vul_function=vul_function, file_path=file_path, + isback=isback) if is_co == 3 and cp != param: # 理由如上 is_co, cp, expr_lineno = parameters_back(param, nodes[:-1], function_params, lineno, - function_flag=1, - vul_function=vul_function, isback=isback) # 找到可控的输入时,停止递归 + function_flag=1, vul_function=vul_function, file_path=file_path, + isback=isback) # 找到可控的输入时,停止递归 return is_co, cp, expr_lineno else: break @@ -747,12 +805,13 @@ def parameters_back(param, nodes, function_params=None, lineno=0, else_node_lineno = 0 is_co, cp, expr_lineno = parameters_back(param, else_nodes, function_params, else_node_lineno, - function_flag=1, vul_function=vul_function, isback=isback) + function_flag=1, vul_function=vul_function, + file_path=file_path, isback=isback) if is_co == 3 and cp != param: # 理由如上 is_co, cp, expr_lineno = parameters_back(param, nodes[:-1], function_params, lineno, - function_flag=1, - vul_function=vul_function, isback=isback) # 找到可控的输入时,停止递归 + function_flag=1, vul_function=vul_function, file_path=file_path, + isback=isback) # 找到可控的输入时,停止递归 return is_co, cp, expr_lineno elif isinstance(node, php.For): @@ -763,11 +822,13 @@ def parameters_back(param, nodes, function_params=None, lineno=0, "[AST] param {} line {} in for, start ast in for".format(param_name, for_node_lineno)) is_co, cp, expr_lineno = parameters_back(param, for_nodes, function_params, for_node_lineno, - function_flag=1, vul_function=vul_function, isback=isback) + function_flag=1, vul_function=vul_function, file_path=file_path, + isback=isback) if is_co == 3 or int(lineno) == node.lineno: # 当is_co为True时找到可控,停止递归 is_co, cp, expr_lineno = parameters_back(param, nodes[:-1], function_params, lineno, - function_flag=1, vul_function=vul_function, isback=isback) # 找到可控的输入时,停止递归 + function_flag=1, vul_function=vul_function, file_path=file_path, + isback=isback) # 找到可控的输入时,停止递归 elif len(nodes) == 0 and function_params is not None: # 当敏感函数在函数中时,function_params不为空,这时应进入自定义敏感函数逻辑 for function_param in function_params: @@ -795,7 +856,7 @@ def deep_parameters_back(param, back_node, function_params, count, file_path, li count += 1 padding = {} - is_co, cp, expr_lineno = parameters_back(param, back_node, function_params, lineno, vul_function=vul_function, isback=isback) + is_co, cp, expr_lineno = parameters_back(param, back_node, function_params, lineno, vul_function=vul_function, file_path=file_path, isback=isback) if count > 20: logger.warning("[Deep AST] depth too big, auto exit...") @@ -817,7 +878,11 @@ def deep_parameters_back(param, back_node, function_params, count, file_path, li # 主要解决两个问题,一个是全局define,一个是变量 if isinstance(param, php.Variable): logger.debug("[AST][INCLUDE] The include file name has an unknown parameter {}.".format(param)) - # print(back_node[:back_node.index(node)]) + + file_path = os.path.normpath(file_path) + code = "find {} in Include path".format(param, file_path) + scan_chain.append(('IncludePath', code, file_path, node.lineno)) + is_co, ccp, expr_lineno = deep_parameters_back(param, back_node[:back_node.index(node)], function_params, count, file_path, lineno, vul_function=vul_function, isback=True) @@ -844,29 +909,22 @@ def deep_parameters_back(param, back_node, function_params, count, file_path, li try: logger.debug("[Deep AST] open new file {file_path}".format(file_path=file_path_name)) - # f = open(file_path_name, 'r') - # f = codecs.open(file_path_name, "r", encoding='utf-8', errors='ignore') - # file_content = f.read() + all_nodes = ast_object.get_nodes(file_path_name) except: logger.warning("[Deep AST] error to open new file...continue") continue - # try: - # # 目标可能语法错误 - # parser = make_parser() - # except SyntaxError: - # logger.warning('[AST] target php file exist SyntaxError...') - # logger.warning('[AST] [ERROR]:{e}'.format(e=traceback.format_exc())) - - # all_nodes = parser.parse(file_content, debug=False, lexer=lexer.clone(), tracking=with_line) node = cp - # node = php.Variable(cp) + + file_path = os.path.normpath(file_path) + code = "find {} in Include {}".format(node, file_path_name) + scan_chain.append(('Include', code, file_path, node.lineno)) is_co, cp, expr_lineno = deep_parameters_back(node, all_nodes, function_params, count, file_path_name, lineno, vul_function=vul_function, isback=isback) - if is_co == -1: + if is_co == -1 or is_co == 1: break return is_co, cp, expr_lineno @@ -906,18 +964,18 @@ def get_function_params(nodes): return params -def anlysis_params(param, file_path, lineno, vul_function=None, repair_functions=None): +def anlysis_params(param, file_path, lineno, vul_function=None, repair_functions=None, isexternal=False): """ 在cast调用时做中转数据预处理 :param repair_functions: :param vul_function: :param lineno: :param param: - :param code_content: + :param isexternal: 是否外部调用 :param file_path: :return: """ - global is_repair_functions + global is_repair_functions, scan_chain count = 0 function_params = None if repair_functions is not None: @@ -928,16 +986,10 @@ def anlysis_params(param, file_path, lineno, vul_function=None, repair_functions param_right = param.split("->")[1] param = php.ObjectProperty(param_left, param_right) + if isexternal: + scan_chain = ['start'] + param = php.Variable(param) - # try: - # # 目标可能语法错误 - # parser = make_parser() - # except SyntaxError: - # logger.warning('[AST] target php file exist SyntaxError...') - # logger.warning('[AST] [ERROR]:{e}'.format(e=traceback.format_exc())) - # return -1, "", "" - # - # all_nodes = parser.parse(code_content, debug=False, lexer=lexer.clone(), tracking=with_line) all_nodes = ast_object.get_nodes(file_path) # 做一次处理,解决Variable(Variable('$id'))的问题 @@ -945,6 +997,8 @@ def anlysis_params(param, file_path, lineno, vul_function=None, repair_functions param = param.name logger.debug("[AST] AST to find param {}".format(param)) + code = "find param {}".format(param) + scan_chain.append(('NewFind', code, file_path, lineno)) vul_nodes = [] for node in all_nodes: @@ -954,7 +1008,7 @@ def anlysis_params(param, file_path, lineno, vul_function=None, repair_functions is_co, cp, expr_lineno = deep_parameters_back(param, vul_nodes, function_params, count, file_path, lineno, vul_function=vul_function) - return is_co, cp, expr_lineno + return is_co, cp, expr_lineno, scan_chain def anlysis_function(node, back_node, vul_function, function_params, vul_lineno, file_path=None): @@ -1041,11 +1095,7 @@ def analysis_binaryop_node(node, back_node, vul_function, vul_lineno, function_p # is_co, cp, expr_lineno = parameters_back(param, back_node, function_params) if file_path is not None: - # with open(file_path, 'r') as fi: - # fi = codecs.open(file_path, 'r', encoding='utf-8', errors='ignore') - # code_content = fi.read() - is_co, cp, expr_lineno = anlysis_params(param, file_path, param_lineno, - vul_function=vul_function) + is_co, cp, expr_lineno, chain = anlysis_params(param, file_path, param_lineno, vul_function=vul_function) else: count = 0 is_co, cp, expr_lineno = deep_parameters_back(node, back_node, function_params, count, file_path, @@ -1076,7 +1126,7 @@ def analysis_objectproperry_node(node, back_node, vul_function, vul_lineno, func # fi = codecs.open(file_path, 'r', encoding='utf-8', errors='ignore') # code_content = fi.read() - is_co, cp, expr_lineno = anlysis_params(param, file_path, param_lineno, vul_function=vul_function) + is_co, cp, expr_lineno, chain = anlysis_params(param, file_path, param_lineno, vul_function=vul_function) else: count = 0 is_co, cp, expr_lineno = deep_parameters_back(node, back_node, function_params, count, @@ -1117,15 +1167,9 @@ def analysis_functioncall_node(node, back_node, vul_function, vul_lineno, functi for param in params: param = php.Variable(param) param_lineno = node.lineno - # is_co, cp, expr_lineno = parameters_back(param, back_node, function_params) if file_path is not None: - # with open(file_path, 'r') as fi: - # fi = codecs.open(file_path, 'r', encoding='utf-8', errors='ignore') - # code_content = fi.read() - - is_co, cp, expr_lineno = anlysis_params(param, file_path, param_lineno, - vul_function=vul_function) + is_co, cp, expr_lineno, chain = anlysis_params(param, file_path, param_lineno, vul_function=vul_function) else: count = 0 is_co, cp, expr_lineno = deep_parameters_back(node, back_node, function_params, count, file_path, @@ -1150,11 +1194,8 @@ def analysis_variable_node(node, back_node, vul_function, vul_lineno, function_p param_lineno = node.lineno if file_path is not None: - # with open(file_path, 'r') as fi: - # fi = codecs.open(file_path, 'r', encoding='utf-8', errors='ignore') - # code_content = fi.read() - is_co, cp, expr_lineno = anlysis_params(param, file_path, param_lineno, vul_function=vul_function) + is_co, cp, expr_lineno, chain = anlysis_params(param, file_path, param_lineno, vul_function=vul_function) else: count = 0 is_co, cp, expr_lineno = deep_parameters_back(node, back_node, function_params, count, file_path, @@ -1359,7 +1400,7 @@ def analysis_file_inclusion(node, vul_function, back_node, vul_lineno, function_ :param back_node: :param vul_lineno: :param function_params: - :return: + :return: """ global scan_results include_fs = ['include', 'include_once', 'require', 'require_once'] @@ -1397,7 +1438,7 @@ def set_scan_results(is_co, cp, expr_lineno, sink, param, vul_lineno): :return: """ results = [] - global scan_results + global scan_results, scan_chain result = { 'code': is_co, @@ -1405,7 +1446,8 @@ def set_scan_results(is_co, cp, expr_lineno, sink, param, vul_lineno): 'source_lineno': expr_lineno, 'sink': sink, 'sink_param:': param, - 'sink_lineno': vul_lineno + 'sink_lineno': vul_lineno, + "chain": scan_chain, } if result['code'] > 0: # 查出来漏洞结果添加到结果信息中 results.append(result) @@ -1479,14 +1521,15 @@ def scan_parser(sensitive_func, vul_lineno, file_path, repair_functions=[]): """ 开始检测函数 :param repair_functions: - :param code_content: 要检测的文件内容 :param sensitive_func: 要检测的敏感函数,传入的为函数列表 :param vul_lineno: 漏洞函数所在行号 :param file_path: 文件路径 :return: """ try: - global scan_results, is_repair_functions + global scan_results, is_repair_functions, scan_chain + + scan_chain = ['start'] scan_results = [] is_repair_functions = repair_functions all_nodes = ast_object.get_nodes(file_path) @@ -1494,6 +1537,12 @@ def scan_parser(sensitive_func, vul_lineno, file_path, repair_functions=[]): for func in sensitive_func: # 循环判断代码中是否存在敏感函数,若存在,递归判断参数是否可控;对文件内容循环判断多次 back_node = [] analysis(all_nodes, func, back_node, int(vul_lineno), file_path, function_params=None) + + # 如果检测到一次,那么就可以退出了 + if len(scan_results) > 0: + logger.debug("[AST] Scan parser end for {}".format(scan_results)) + break + except SyntaxError as e: logger.warning('[AST] [ERROR]:{e}'.format(e=traceback.format_exc())) diff --git a/cobra/result.py b/cobra/result.py index 99126a40..53f9ca61 100644 --- a/cobra/result.py +++ b/cobra/result.py @@ -18,6 +18,7 @@ def __init__(self): self.id = '' self.file_path = None self.analysis = '' + self.chain = "" self.rule_name = '' self.language = '' diff --git a/docs/changelog.md b/docs/changelog.md index ccec9243..375e4cf7 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -84,4 +84,12 @@ - Cobra-W 1.2.0 - 修复了include节点中出现变量,无法正确回溯的问题 - 花大代价尝试重构关于ast处理部分,把ast处理整体提出 - - 解决了之前无法检索define全局变量的问题 \ No newline at end of file + - 解决了之前无法检索define全局变量的问题 +- 2019-04-19 + - Cobra-W 1.3.0 + - 添加了调用链展示功能 + - 试探性的加入了疑似漏洞在漏洞表内 + - 修复了上版本中如果路径错误会导致获取node失败的问题 + - 修复了针对function节点递归检测异常的问题 + - 修复了针对function-regex模式即使匹配到敏感函数之后还会继续匹配的问题 + \ No newline at end of file diff --git a/tests/ast/test_function/test_function.php b/tests/ast/test_function/test_function.php index 3fc0dcc1..f3be0972 100644 --- a/tests/ast/test_function/test_function.php +++ b/tests/ast/test_function/test_function.php @@ -12,7 +12,22 @@ function b($a){ eval($a); } - b($s3); +function curl($url){ + $ch = curl_init(); + curl_setopt($ch, CURLOPT_URL, $url); + curl_setopt($ch, CURLOPT_HEADER, 0); + curl_exec($ch); + curl_close($ch); +} + +$url = $_GET['url']; +if (!empty($url)){ + curl($cmd); +} + +eval($cmd); + +b($s3); $id = addslashes($_GET['id']); $id2 = $_GET['id'];