Skip to content

Commit

Permalink
update 1.3.0
Browse files Browse the repository at this point in the history
update 1.3.0
  • Loading branch information
LoRexxar authored Apr 19, 2019
2 parents 949fa32 + 3c7acee commit 8f55e56
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 110 deletions.
2 changes: 1 addition & 1 deletion cobra/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
__issue_page__ = 'https://github.com/LoRexxar/Cobra-W/issues/new'
__python_version__ = sys.version.split()[0]
__platform__ = platform.platform()
__version__ = '1.2.0'
__version__ = '1.3.0'
__author__ = 'LoRexxar'
__author_email__ = '[email protected]'
__license__ = 'MIT License'
Expand Down
10 changes: 5 additions & 5 deletions cobra/cast.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,17 +241,17 @@ def is_controllable_param(self):

logger.debug("[Deep AST] Start AST for param {param_name}".format(param_name=param_name))

_is_co, _cp, expr_lineno = anlysis_params(param_name, self.file_path, self.line, self.sr.vul_function, self.repair_functions)
_is_co, _cp, expr_lineno, chain = anlysis_params(param_name, self.file_path, self.line, self.sr.vul_function, self.repair_functions, isexternal=True)

if _is_co == 1:
logger.debug("[AST] Is assign string: `Yes`")
return True, _cp
return True, _is_co, _cp, chain
elif _is_co == 3:
logger.info("[AST] can't find this param, something error..")
continue
logger.info("[AST] can't find this param, Unconfirmed vulnerable..")
return True, _is_co, _cp, chain
elif _is_co == 4:
logger.info("[AST] New vul function {}()".format(_cp[0].name))
return False, tuple([_is_co, _cp])
return False, _is_co, tuple([_is_co, _cp]), chain
else:
continue

Expand Down
69 changes: 52 additions & 17 deletions cobra/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,11 @@ def store(result):

# print
data = []
data2 = []
table = PrettyTable(
['#', 'CVI', 'Rule(ID/Name)', 'Lang/CVE-id', 'Target-File:Line-Number',
'Commit(Author)', 'Source Code Content', 'Analysis'])

table.align = 'l'
trigger_rules = []
for idx, x in enumerate(find_vulnerabilities):
Expand All @@ -201,11 +203,20 @@ def store(result):
except AttributeError as e:
code_content = x.code_content.decode('utf-8')[:100].strip()
row = [idx + 1, x.id, x.rule_name, x.language, trigger, commit, code_content, x.analysis]
row2 = [idx+1, x.chain]

data.append(row)
data2.append(row2)

table.add_row(row)

if x.id not in trigger_rules:
logger.debug(' > trigger rule (CVI-{cvi})'.format(cvi=x.id))
trigger_rules.append(x.id)

# clear
x.chain = ""

diff_rules = list(set(push_rules) - set(trigger_rules))
vn = len(find_vulnerabilities)
if vn == 0:
Expand All @@ -214,9 +225,20 @@ def store(result):
logger.info("[SCAN] Trigger Rules: {tr} Vulnerabilities ({vn})\r\n{table}".format(tr=len(trigger_rules),
vn=len(find_vulnerabilities),
table=table))

# 输出chain for all
logger.info("[SCAN] Vulnerabilities Chain list: ")
for d in data2:
logger.info("[SCAN] Vul {}".format(d[0]))
for c in d[1]:
logger.info("[Chain] {}".format(c))

logger.info("[SCAN] ending\r\n -------------------------------------------------------------------------")

if len(diff_rules) > 0:
logger.info(
'[SCAN] Not Trigger Rules ({l}): {r}'.format(l=len(diff_rules), r=','.join(diff_rules)))

# completed running data
if s_sid is not None:
Running(s_sid).data({
Expand Down Expand Up @@ -399,6 +421,12 @@ def process(self):

if len(datas) == 3:
is_vulnerability, reason, data = datas

if "New Core" not in reason:
code = "Code: {}".format(origin_vulnerability[2].strip(" "))
file_path = os.path.normpath(origin_vulnerability[0])
data.insert(1, ("NewScan", code, origin_vulnerability[0], origin_vulnerability[1]))

elif len(datas) == 2:
is_vulnerability, reason = datas
else:
Expand All @@ -407,6 +435,7 @@ def process(self):
if is_vulnerability:
logger.debug('[CVI-{cvi}] [RET] Found {code}'.format(cvi=self.sr.svid, code=reason))
vulnerability.analysis = reason
vulnerability.chain = data
self.rule_vulnerabilities.append(vulnerability)
else:
if reason == 'New Core': # 新的规则
Expand Down Expand Up @@ -688,15 +717,18 @@ def scan(self):
logger.debug('[AST] [RET] {c}'.format(c=result))
if len(result) > 0:
if result[0]['code'] == 1: # 函数参数可控
return True, 'Function-param-controllable'
return True, 'Function-param-controllable', result[0]['chain']

elif result[0]['code'] == 2: # 漏洞修复
return False, 'Function-param-controllable but fixed', result[0]['chain']

if result[0]['code'] == 2: # 漏洞修复
return False, 'Function-param-controllable but fixed'
elif result[0]['code'] == 3: # 疑似漏洞
return True, 'Unconfirmed Function-param-controllable', result[0]['chain']

if result[0]['code'] == -1: # 函数参数不可控
return False, 'Function-param-uncon'
elif result[0]['code'] == -1: # 函数参数不可控
return False, 'Function-param-uncon', result[0]['chain']

if result[0]['code'] == 4: # 新规则生成
elif result[0]['code'] == 4: # 新规则生成
return False, 'New Core', result[0]['source']

logger.debug('[AST] [CODE] {code}'.format(code=result[0]['code']))
Expand All @@ -711,18 +743,15 @@ def scan(self):
raise

# vustomize-match
param_is_controllable, data = ast.is_controllable_param()
param_is_controllable, code, data, chain = ast.is_controllable_param()
if param_is_controllable:
logger.debug('[CVI-{cvi}] [PARAM-CONTROLLABLE] Param is controllable'.format(cvi=self.cvi))
# Repair
# is_repair, data = ast.match(self.rule_repair, self.repair_block)
# if is_repair:
# # fixed
# logger.debug('[CVI-{cvi}] [REPAIR] Vulnerability Fixed'.format(cvi=self.cvi))
# return False, 'Vulnerability-Fixed(漏洞已修复)'
# else:
# logger.debug('[CVI-{cvi}] [REPAIR] [RET] Not fixed'.format(cvi=self.cvi))
return True, 'Vustomize-Match'

if code == 1:
return True, 'Vustomize-Match', chain
elif code ==3:
return False, 'Unconfirmed Vustomize-Match', chain

else:
if type(data) is tuple:
if int(data[0]) == 4:
Expand Down Expand Up @@ -760,7 +789,6 @@ def scan(self):
return False, 'Exception'



def init_match_rule(data):
"""
处理新生成规则初始化正则匹配
Expand Down Expand Up @@ -946,8 +974,14 @@ def NewCore(old_single_rule, target_directory, new_rules, files, count=0, secret
datas = Core(target_directory, vulnerability, sr, 'project name',
['whitelist1', 'whitelist2'], files=files, secret_name=secret_name).scan()
data = ""

if len(datas) == 3:
is_vulnerability, reason, data = datas

if "New Core" not in reason:
code = "Code: {}".format(origin_vulnerability[2])
data.insert(1, ("NewScan", code, origin_vulnerability[0], origin_vulnerability[1]))

elif len(datas) == 2:
is_vulnerability, reason = datas
else:
Expand All @@ -956,6 +990,7 @@ def NewCore(old_single_rule, target_directory, new_rules, files, count=0, secret
if is_vulnerability:
logger.debug('[CVI-{cvi}] [RET] Found {code}'.format(cvi="00000", code=reason))
vulnerability.analysis = reason
vulnerability.chain = data
rule_vulnerabilities.append(vulnerability)
else:
if reason == 'New Core': # 新的规则
Expand Down
Loading

0 comments on commit 8f55e56

Please sign in to comment.