Skip to content

Commit

Permalink
fix: checkmarx parser aggregation and deduplication with query id
Browse files Browse the repository at this point in the history
this should fix DefectDojo#3958
the aggregation mechanism and deduplication mechanism for checkmarx are now using the same fields
it now uses the query id of checkmarx in the hash code to avoid creating multiple issue for each checkmarx "result"
we keep the aggregation but now we can no longer find duplicates inside a single report
  • Loading branch information
jcaillon committed Dec 1, 2021
1 parent 9615234 commit 2d3c1bd
Show file tree
Hide file tree
Showing 3 changed files with 1,568 additions and 37 deletions.
32 changes: 19 additions & 13 deletions dojo/tools/checkmarx/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_findings(self, filename, test):
# - value: the finding
dupes = dict()
for query in root.findall('Query'):
name, cwe, categories = self.getQueryElements(query)
name, cwe, categories, queryId = self.getQueryElements(query)
language = ''
findingdetail = ''
group = ''
Expand Down Expand Up @@ -89,7 +89,7 @@ def get_findings(self, filename, test):
findingdetail = "{}**Status:** {}\n".format(findingdetail, result.get('Status'))

deeplink = "[{}]({})".format(result.get('DeepLink'), result.get('DeepLink'))
findingdetail = "{}**Finding Link:** {}\n\n".format(findingdetail, deeplink)
findingdetail = "{}**Finding Link:** {}\n".format(findingdetail, deeplink)

if self.mode == 'detailed':
self.process_result_detailed(dupes, findingdetail, query, result, find_date)
Expand All @@ -106,23 +106,23 @@ def get_findings(self, filename, test):
# Create the finding and add it into the dupes list
# If a vuln with the same file_path was found before, updates the description
def process_result_file_name_aggregated(self, dupes, findingdetail, query, result, find_date):
name, cwe, categories = self.getQueryElements(query)
name, cwe, categories, queryId = self.getQueryElements(query)
titleStart = query.get('name').replace('_', ' ')
description, lastPathnode = self.get_description_file_name_aggregated(query, result)
sinkFilename = lastPathnode.find('FileName').text
title = "{} ({})".format(titleStart, ntpath.basename(sinkFilename))
false_p = result.get('FalsePositive')
aggregateKeys = "{}{}{}{}".format(categories, cwe, name, sinkFilename)
sev = result.get('Severity')
aggregateKeys = "{}{}{}".format(cwe, sev, sinkFilename)

if not(aggregateKeys in dupes):
sev = result.get('Severity')
find = Finding(title=title,
cwe=int(cwe),
test=self.test,
# this may be overwritten later by another member of the aggregate, see "else" below
false_p=(false_p == "True"),
# Concatenates the query information with this specific finding information
description=findingdetail + '-----\n' + description,
description=findingdetail + description,
severity=sev,
mitigation=self.mitigation,
impact=self.impact,
Expand All @@ -132,13 +132,17 @@ def process_result_file_name_aggregated(self, dupes, findingdetail, query, resul
url='N/A',
date=find_date,
static_finding=True,
nb_occurences=1)
nb_occurences=1,
vuln_id_from_tool=queryId)
dupes[aggregateKeys] = find
else:
# We have already created a finding for this aggregate: updates the description and the nb_occurences
find = dupes[aggregateKeys]
find.description = "{}\n-----\n{}".format(find.description, description)
find.nb_occurences = find.nb_occurences + 1
if find.nb_occurences == 2:
find.description = "### 1. {}\n{}".format(find.title, find.description)
find.description = "{}\n\n-----\n### {}. {}\n{}\n{}".format(find.description, find.nb_occurences, title, findingdetail, description)
find.vuln_id_from_tool = "{},{}".format(find.vuln_id_from_tool, queryId)
# If at least one of the findings in the aggregate is exploitable, the defectdojo finding should not be "false positive"
if(false_p == "False"):
dupes[aggregateKeys].false_p = False
Expand All @@ -156,14 +160,14 @@ def get_description_file_name_aggregated(self, query, result):
firstPathnode = False
# At this point we have iterated over all path nodes (function calls) and pathnode is at the sink of the vulnerability
sinkFilename, sinkLineNumber, sinkObject = self.get_pathnode_elements(pathnode)
description = "<b>Source filename: </b>{}\n<b>Source line number: </b> {}\n<b>Source object: </b> {}".format(sourceFilename, sourceLineNumber, sourceObject)
description = "{}\n\n<b>Sink filename: </b>{}\n<b>Sink line number: </b> {}\n<b>Sink object: </b> {}".format(description, sinkFilename, sinkLineNumber, sinkObject)
description = "<b>Source file: </b>{} (line {})\n<b>Source object: </b> {}".format(sourceFilename, sourceLineNumber, sourceObject)
description = "{}\n<b>Sink file: </b>{} (line {})\n<b>Sink object: </b> {}".format(description, sinkFilename, sinkLineNumber, sinkObject)
return description, pathnode

# Process one result = one pathId for scanner "Checkmarx Scan detailed"
# Create the finding and add it into the dupes list
def process_result_detailed(self, dupes, findingdetail, query, result, find_date):
name, cwe, categories = self.getQueryElements(query)
name, cwe, categories, queryId = self.getQueryElements(query)
title = ''
sev = result.get('Severity')
title = query.get('name').replace('_', ' ')
Expand Down Expand Up @@ -214,7 +218,8 @@ def process_result_detailed(self, dupes, findingdetail, query, result, find_date
sast_source_object=sourceObject,
sast_sink_object=sinkObject,
sast_source_line=sourceLineNumber,
sast_source_file_path=sourceFilename)
sast_source_file_path=sourceFilename,
vuln_id_from_tool=queryId)
dupes[aggregateKeys] = find

# Return filename, lineNumber and object (function/parameter...) for a given pathnode
Expand Down Expand Up @@ -243,6 +248,7 @@ def getQueryElements(self, query):
categories = ''
name = query.get('name')
cwe = query.get('cweId')
queryId = query.get('id')
if query.get('categories') is not None:
categories = query.get('categories')
return name, cwe, categories
return name, cwe, categories, queryId
Loading

0 comments on commit 2d3c1bd

Please sign in to comment.