Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add CVE support to Snyk datasource #1405

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions vulntotal/datasources/snyk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import logging
from typing import Iterable
from urllib.parse import quote
from urllib.parse import unquote_plus

import requests
from bs4 import BeautifulSoup
from packageurl import PackageURL

from vulntotal.validator import DataSource
from vulntotal.validator import InvalidCVEError
from vulntotal.validator import VendorData
from vulntotal.vulntotal_utils import snyk_constraints_satisfied

Expand Down Expand Up @@ -70,6 +72,38 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]:
if advisory_html:
yield parse_html_advisory(advisory_html, snyk_id, affected, purl)

def datasource_advisory_from_cve(self, cve: str) -> Iterable[VendorData]:
"""
Fetch advisories from Snyk for a given CVE.

Parameters:
cve : CVE ID

Yields:
VendorData instance containing advisory information.
"""
if not cve.upper().startswith("CVE-"):
raise InvalidCVEError

package_list = generate_payload_from_cve(cve)
response = self.fetch(package_list)
self._raw_dump = [response]

# get list of vulnerabilities for cve id
vulns_list = parse_cve_advisory_html(response)

# for each vulnerability get fixed version from snyk_id_url, get affected version from package_advisory_url
for snyk_id, package_advisory_url in vulns_list.items():
package_advisories_list = self.fetch(package_advisory_url)
package_advisories = extract_html_json_advisories(package_advisories_list)
affected_versions = package_advisories[snyk_id]
advisory_payload = generate_advisory_payload(snyk_id)
advisory_html = self.fetch(advisory_payload)
self._raw_dump.append(advisory_html)
purl = generate_purl(package_advisory_url)
if advisory_html:
yield parse_html_advisory(advisory_html, snyk_id, affected_versions, purl)

@classmethod
def supported_ecosystem(cls):
return {
Expand Down Expand Up @@ -132,6 +166,53 @@ def generate_package_advisory_url(purl):
)


def generate_purl(package_advisory_url):
"""
Generates purl from Package advisory url.

Parameters:
package_advisory_url: URL of the package on Snyk.

Returns:
A PackageURL instance representing the package
"""
package_advisory_url = unquote_plus(
package_advisory_url.replace("https://security.snyk.io/package/", "")
)

package_url_split = package_advisory_url.split("/")
pkg_type = package_url_split[0]

pkg_name = None
namespace = None
version = None
qualifiers = {}

if pkg_type == "maven":
pkg_name = package_url_split[1].split(":")[1]
namespace = package_url_split[1].split(":")[0]

elif pkg_type in ("golang", "composer"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PURL creation for golang needs to be properly handled. Take a look at the example below.

For the snyk url https://security.snyk.io/package/golang/github.com%2Fgoauthentik%2Fauthentik%2Fauthentik%2Fproviders%2Foauth2%2Fviews the corresponding purl would be

PackageURL(
    type='golang',
    namespace='github.com/goauthentik/authentik/authentik/providers/oauth2',
    name='views',
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for highlighting this. I have modified the code so that namespace for Go packages is captured correctly.

if package_url_split[1] == "github.com":
pkg_name = package_url_split[-2]
namespace = f"{package_url_split[1]}/{package_url_split[2]}"
version = package_url_split[-1]
else:
pkg_name = package_url_split[-1]
namespace = package_url_split[-2]

elif pkg_type == "linux":
pkg_name = package_url_split[-1]
qualifiers["distro"] = package_url_split[1]

else:
shravankshenoy marked this conversation as resolved.
Show resolved Hide resolved
pkg_name = package_url_split[-1]

return PackageURL(
type=pkg_type, name=pkg_name, namespace=namespace, version=version, qualifiers=qualifiers
shravankshenoy marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must explicitly handle the PURL creation for each ecosystem from the Snyk URL.
For a better understanding on how to create a PURL for a particular ecosystem, you should check https://github.com/package-url/purl-spec/blob/master/PURL-TYPES.rst

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing this. I have tried my best to make sure the purl-spec is followed properly now, but let me know if I missed something.

)


def extract_html_json_advisories(package_advisories):
"""
Extract vulnerability information from HTML or JSON advisories.
Expand Down Expand Up @@ -204,9 +285,41 @@ def parse_html_advisory(advisory_html, snyk_id, affected, purl) -> VendorData:
)


def parse_cve_advisory_html(cve_advisory_html):
"""
Parse CVE HTML advisory from Snyk and extract list of vulnerabilities and corresponding packages for that CVE.

Parameters:
advisory_html: A string of HTML containing the vulnerabilities for given CVE.

Returns:
A dictionary with each item representing a vulnerability. Key of each item is the SNYK_ID and value is the package advisory url on snyk website
"""
cve_advisory_soup = BeautifulSoup(cve_advisory_html, "html.parser")
vulns_table = cve_advisory_soup.find("tbody", class_="vue--table__tbody")
if not vulns_table:
return None
vulns_rows = vulns_table.find_all("tr", class_="vue--table__row")
vulns_list = {}

for row in vulns_rows:
anchors = row.find_all("a", {"class": "vue--anchor"})
if len(anchors) != 2:
continue
snyk_id = anchors[0]["href"].split("/")[1]
package_advisory_url = f"https://security.snyk.io{anchors[1]['href']}"
vulns_list[snyk_id] = package_advisory_url

return vulns_list


def is_purl_in_affected(version, affected):
return any(snyk_constraints_satisfied(affected_range, version) for affected_range in affected)


def generate_advisory_payload(snyk_id):
return f"https://security.snyk.io/vuln/{snyk_id}"


def generate_payload_from_cve(cve_id):
return f"https://security.snyk.io/vuln?search={cve_id}"
Loading
Loading