diff --git a/vulntotal/datasources/snyk.py b/vulntotal/datasources/snyk.py
index 4af4fa619..5b2418071 100644
--- a/vulntotal/datasources/snyk.py
+++ b/vulntotal/datasources/snyk.py
@@ -10,12 +10,14 @@
import logging
from typing import Iterable
from urllib.parse import quote
+from urllib.parse import unquote_plus
import requests
from bs4 import BeautifulSoup
from packageurl import PackageURL
from vulntotal.validator import DataSource
+from vulntotal.validator import InvalidCVEError
from vulntotal.validator import VendorData
from vulntotal.vulntotal_utils import snyk_constraints_satisfied
@@ -70,6 +72,38 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]:
if advisory_html:
yield parse_html_advisory(advisory_html, snyk_id, affected, purl)
+ def datasource_advisory_from_cve(self, cve: str) -> Iterable[VendorData]:
+ """
+ Fetch advisories from Snyk for a given CVE.
+
+ Parameters:
+ cve : CVE ID
+
+ Yields:
+ VendorData instance containing advisory information.
+ """
+ if not cve.upper().startswith("CVE-"):
+ raise InvalidCVEError
+
+ package_list = generate_payload_from_cve(cve)
+ response = self.fetch(package_list)
+ self._raw_dump = [response]
+
+ # get list of vulnerabilities for cve id
+ vulns_list = parse_cve_advisory_html(response)
+
+ # for each vulnerability get fixed version from snyk_id_url, get affected version from package_advisory_url
+ for snyk_id, package_advisory_url in vulns_list.items():
+ package_advisories_list = self.fetch(package_advisory_url)
+ package_advisories = extract_html_json_advisories(package_advisories_list)
+ affected_versions = package_advisories[snyk_id]
+ advisory_payload = generate_advisory_payload(snyk_id)
+ advisory_html = self.fetch(advisory_payload)
+ self._raw_dump.append(advisory_html)
+ purl = generate_purl(package_advisory_url)
+ if advisory_html and purl:
+ yield parse_html_advisory(advisory_html, snyk_id, affected_versions, purl)
+
@classmethod
def supported_ecosystem(cls):
return {
@@ -132,6 +166,61 @@ def generate_package_advisory_url(purl):
)
+def generate_purl(package_advisory_url):
+ """
+ Generates purl from Package advisory url.
+
+ Parameters:
+ package_advisory_url: URL of the package on Snyk.
+
+ Returns:
+ A PackageURL instance representing the package
+ """
+ package_advisory_url = unquote_plus(
+ package_advisory_url.replace("https://security.snyk.io/package/", "")
+ )
+ supported_ecosystems = {v: k for (k, v) in SnykDataSource.supported_ecosystem().items()}
+
+ package_url_split = package_advisory_url.split("/")
+ pkg_type = package_url_split[0]
+
+ pkg_name = None
+ namespace = None
+ qualifiers = {}
+
+ if pkg_type == "maven":
+ pkg_name = package_url_split[1].split(":")[1]
+ namespace = package_url_split[1].split(":")[0]
+
+ elif pkg_type == "composer":
+ pkg_name = package_url_split[-1]
+ namespace = package_url_split[-2]
+
+ elif pkg_type == "golang":
+ pkg_name = package_url_split[-1]
+ namespace = "/".join(package_url_split[1:-1])
+
+ elif pkg_type == "npm":
+ # handle scoped npm packages
+ if "@" in package_advisory_url:
+ namespace = package_url_split[-2]
+
+ pkg_name = package_url_split[-1]
+
+ elif pkg_type == "linux":
+ pkg_name = package_url_split[-1]
+ qualifiers["distro"] = package_url_split[1]
+
+ elif pkg_type in ("cocoapods", "hex", "nuget", "pip", "rubygems", "unmanaged"):
+ pkg_name = package_url_split[-1]
+
+ if pkg_type is None or pkg_name is None:
+ logger.error("Invalid package advisory url, package type or name is missing")
+ return
+
+ return PackageURL(type=supported_ecosystems[pkg_type], name=pkg_name, namespace=namespace)
+
+
def extract_html_json_advisories(package_advisories):
"""
Extract vulnerability information from HTML or JSON advisories.
@@ -204,9 +293,41 @@ def parse_html_advisory(advisory_html, snyk_id, affected, purl) -> VendorData:
)
+def parse_cve_advisory_html(cve_advisory_html):
+ """
+ Parse CVE HTML advisory from Snyk and extract list of vulnerabilities and corresponding packages for that CVE.
+
+ Parameters:
+ advisory_html: A string of HTML containing the vulnerabilities for given CVE.
+
+ Returns:
+ A dictionary with each item representing a vulnerability. Key of each item is the SNYK_ID and value is the package advisory url on snyk website
+ """
+ cve_advisory_soup = BeautifulSoup(cve_advisory_html, "html.parser")
+ vulns_table = cve_advisory_soup.find("tbody", class_="vue--table__tbody")
+ if not vulns_table:
+ return None
+ vulns_rows = vulns_table.find_all("tr", class_="vue--table__row")
+ vulns_list = {}
+
+ for row in vulns_rows:
+ anchors = row.find_all("a", {"class": "vue--anchor"})
+ if len(anchors) != 2:
+ continue
+ snyk_id = anchors[0]["href"].split("/")[1]
+ package_advisory_url = f"https://security.snyk.io{anchors[1]['href']}"
+ vulns_list[snyk_id] = package_advisory_url
+
+ return vulns_list
+
+
def is_purl_in_affected(version, affected):
return any(snyk_constraints_satisfied(affected_range, version) for affected_range in affected)
def generate_advisory_payload(snyk_id):
return f"https://security.snyk.io/vuln/{snyk_id}"
+
+
+def generate_payload_from_cve(cve_id):
+ return f"https://security.snyk.io/vuln?search={cve_id}"
diff --git a/vulntotal/tests/test_data/snyk/html/4.html b/vulntotal/tests/test_data/snyk/html/4.html
new file mode 100644
index 000000000..6ab4ccc0a
--- /dev/null
+++ b/vulntotal/tests/test_data/snyk/html/4.html
@@ -0,0 +1,190 @@
+
+
+
+ Vulnerability DB | Snyk
+
+
+
+
+
diff --git a/vulntotal/tests/test_data/snyk/html/4.html-expected.json b/vulntotal/tests/test_data/snyk/html/4.html-expected.json
new file mode 100644
index 000000000..5f5f93616
--- /dev/null
+++ b/vulntotal/tests/test_data/snyk/html/4.html-expected.json
@@ -0,0 +1,8 @@
+{
+ "SNYK-JAVA-ORGWEBJARSBOWERGITHUBSENTSIN-6146043": "https://security.snyk.io/package/maven/org.webjars.bowergithub.sentsin%3Alayui",
+ "SNYK-JAVA-ORGWEBJARSBOWERGITHUBDIGUOYIHAO-6146042": "https://security.snyk.io/package/maven/org.webjars.bowergithub.diguoyihao%3Alayui",
+ "SNYK-JAVA-ORGWEBJARSBOWER-6146041": "https://security.snyk.io/package/maven/org.webjars.bower%3Alayui",
+ "SNYK-JAVA-ORGWEBJARSBOWERGITHUBLAYUI-6146040": "https://security.snyk.io/package/maven/org.webjars.bowergithub.layui%3Alayui",
+ "SNYK-JAVA-ORGWEBJARS-6146039": "https://security.snyk.io/package/maven/org.webjars%3Alayui",
+ "SNYK-JAVA-ORGWEBJARSNPM-6146038": "https://security.snyk.io/package/maven/org.webjars.npm%3Alayui"
+}
diff --git a/vulntotal/tests/test_data/snyk/html/5.html b/vulntotal/tests/test_data/snyk/html/5.html
new file mode 100644
index 000000000..e52bffcea
--- /dev/null
+++ b/vulntotal/tests/test_data/snyk/html/5.html
@@ -0,0 +1,430 @@
+
+
+
+ Vulnerability DB | Snyk
+
+
+
+
+
diff --git a/vulntotal/tests/test_data/snyk/html/5.html-expected.json b/vulntotal/tests/test_data/snyk/html/5.html-expected.json
new file mode 100644
index 000000000..9b2a495a2
--- /dev/null
+++ b/vulntotal/tests/test_data/snyk/html/5.html-expected.json
@@ -0,0 +1,31 @@
+{
+ "SNYK-SLES155-LIBOPENSSL3-6184655": "https://security.snyk.io/package/linux/sles:15.5/libopenssl3",
+ "SNYK-SLES155-LIBOPENSSL3DEVEL-6184653": "https://security.snyk.io/package/linux/sles:15.5/libopenssl-3-devel",
+ "SNYK-SLES155-OPENSSL3-6184652": "https://security.snyk.io/package/linux/sles:15.5/openssl-3",
+ "SNYK-ALPINE317-OPENSSL-6160001": "https://security.snyk.io/package/linux/alpine:3.17/openssl",
+ "SNYK-ALPINE318-OPENSSL-6160000": "https://security.snyk.io/package/linux/alpine:3.18/openssl",
+ "SNYK-ALPINE319-OPENSSL-6159994": "https://security.snyk.io/package/linux/alpine:3.19/openssl",
+ "SNYK-CENTOS9-OPENSSLPERL-6157924": "https://security.snyk.io/package/linux/centos:9/openssl-perl",
+ "SNYK-RHEL9-OPENSSLPERL-6157922": "https://security.snyk.io/package/linux/rhel:9/openssl-perl",
+ "SNYK-CENTOS9-OPENSSLDEVEL-6157920": "https://security.snyk.io/package/linux/centos:9/openssl-devel",
+ "SNYK-RHEL9-OPENSSLDEVEL-6157919": "https://security.snyk.io/package/linux/rhel:9/openssl-devel",
+ "SNYK-CENTOS9-OPENSSL-6157917": "https://security.snyk.io/package/linux/centos:9/openssl",
+ "SNYK-RHEL9-OPENSSL-6157915": "https://security.snyk.io/package/linux/rhel:9/openssl",
+ "SNYK-CENTOS9-OPENSSLLIBS-6157913": "https://security.snyk.io/package/linux/centos:9/openssl-libs",
+ "SNYK-RHEL9-OPENSSLLIBS-6157911": "https://security.snyk.io/package/linux/rhel:9/openssl-libs",
+ "SNYK-CENTOS9-EDK2TOOLSDOC-6157909": "https://security.snyk.io/package/linux/centos:9/edk2-tools-doc",
+ "SNYK-RHEL9-EDK2TOOLSDOC-6157908": "https://security.snyk.io/package/linux/rhel:9/edk2-tools-doc",
+ "SNYK-CENTOS9-EDK2TOOLS-6157906": "https://security.snyk.io/package/linux/centos:9/edk2-tools",
+ "SNYK-RHEL9-EDK2TOOLS-6157904": "https://security.snyk.io/package/linux/rhel:9/edk2-tools",
+ "SNYK-CENTOS9-EDK2AARCH64-6157902": "https://security.snyk.io/package/linux/centos:9/edk2-aarch64",
+ "SNYK-RHEL9-EDK2AARCH64-6157900": "https://security.snyk.io/package/linux/rhel:9/edk2-aarch64",
+ "SNYK-CENTOS9-EDK2-6157898": "https://security.snyk.io/package/linux/centos:9/edk2",
+ "SNYK-RHEL9-EDK2-6157896": "https://security.snyk.io/package/linux/rhel:9/edk2",
+ "SNYK-CENTOS9-EDK2OVMF-6157895": "https://security.snyk.io/package/linux/centos:9/edk2-ovmf",
+ "SNYK-RHEL9-EDK2OVMF-6157893": "https://security.snyk.io/package/linux/rhel:9/edk2-ovmf",
+ "SNYK-PYTHON-PYOPENSSL-6157250": "https://security.snyk.io/package/pip/pyopenssl",
+ "SNYK-RUBY-OPENSSL-6157246": "https://security.snyk.io/package/rubygems/openssl",
+ "SNYK-RUST-OPENSSLSRC-6157249": "https://security.snyk.io/package/cargo/openssl-src",
+ "SNYK-PYTHON-CRYPTOGRAPHY-6157248": "https://security.snyk.io/package/pip/cryptography",
+ "SNYK-DEBIANUNSTABLE-OPENSSL-6157245": "https://security.snyk.io/package/linux/debian:unstable/openssl"
+}
diff --git a/vulntotal/tests/test_snyk.py b/vulntotal/tests/test_snyk.py
index f4f221b39..871d8a27b 100644
--- a/vulntotal/tests/test_snyk.py
+++ b/vulntotal/tests/test_snyk.py
@@ -52,6 +52,58 @@ def test_generate_package_advisory_url(self):
]
util_tests.check_results_against_expected(results, expected)
+ def test_generate_purl(self):
+ package_advisory_urls = [
+ "https://security.snyk.io/package/pip/jinja2",
+ "https://security.snyk.io/package/maven/org.apache.tomcat%3Atomcat",
+ "https://security.snyk.io/package/npm/semver-regex",
+ "https://security.snyk.io/package/npm/@urql%2Fnext",
+ "https://security.snyk.io/package/npm/@lobehub%2Fchat",
+ "https://security.snyk.io/package/npm/meshcentral",
+ "https://security.snyk.io/package/composer/bolt%2Fcore",
+ "https://security.snyk.io/package/linux/debain:11/trafficserver",
+ "https://security.snyk.io/package/linux/almalinux:8/rpm-plugin-fapolicyd",
+ "https://security.snyk.io/package/nuget/moment.js",
+ "https://security.snyk.io/package/cocoapods/ffmpeg",
+ "https://security.snyk.io/package/hex/coherence",
+ "https://security.snyk.io/package/rubygems/log4j-jars",
+ "https://security.snyk.io/package/golang/github.com%2Fgrafana%2Fgrafana%2Fpkg%2Fservices%2Fsqlstore%2Fmigrator",
+ "https://security.snyk.io/package/golang/github.com%2Fanswerdev%2Fanswer%2Finternal%2Frepo%2Factivity",
+ "https://security.snyk.io/package/golang/go.etcd.io%2Fetcd%2Fv3%2Fauth",
+ "https://security.snyk.io/package/golang/gopkg.in%2Fkubernetes%2Fkubernetes.v0%2Fpkg%2Fregistry%2Fpod",
+ "https://security.snyk.io/package/golang/gogs.io%2Fgogs%2Finternal%2Fdb",
+ "https://security.snyk.io/package/golang/golang.org%2Fx%2Fcrypto%2Fssh",
+ ]
+
+ results = [
+ PackageURL.to_string(snyk.generate_purl(package_advisory_url))
+ for package_advisory_url in package_advisory_urls
+ ]
+
+ expected = [
+ "pkg:pypi/jinja2",
+ "pkg:maven/org.apache.tomcat/tomcat",
+ "pkg:npm/semver-regex",
+ "pkg:npm/%40urql/next",
+ "pkg:npm/%40lobehub/chat",
+ "pkg:npm/meshcentral",
+ "pkg:composer/bolt/core",
+ "pkg:linux/trafficserver",
+ "pkg:linux/rpm-plugin-fapolicyd",
+ "pkg:nuget/moment.js",
+ "pkg:cocoapods/ffmpeg",
+ "pkg:hex/coherence",
+ "pkg:gem/log4j-jars",
+ "pkg:golang/github.com/grafana/grafana/pkg/services/sqlstore/migrator",
+ "pkg:golang/github.com/answerdev/answer/internal/repo/activity",
+ "pkg:golang/go.etcd.io/etcd/v3/auth",
+ "pkg:golang/gopkg.in/kubernetes/kubernetes.v0/pkg/registry/pod",
+ "pkg:golang/gogs.io/gogs/internal/db",
+ "pkg:golang/golang.org/x/crypto/ssh",
+ ]
+
+ util_tests.check_results_against_expected(results, expected)
+
def test_parse_html_advisory_0(self):
file = self.get_test_loc("html/0.html")
with open(file) as f:
@@ -91,3 +143,19 @@ def test_parse_html_advisory_3(self):
).to_dict()
expected_file = f"{file}-expected.json"
util_tests.check_results_against_json(result, expected_file)
+
+ def test_parse_cve_advisory_html_0(self):
+ file = self.get_test_loc("html/4.html")
+ with open(file) as f:
+ page = f.read()
+ result = snyk.parse_cve_advisory_html(page)
+ expected_file = f"{file}-expected.json"
+ util_tests.check_results_against_json(result, expected_file)
+
+ def test_parse_cve_advisory_html_1(self):
+ file = self.get_test_loc("html/5.html")
+ with open(file) as f:
+ page = f.read()
+ result = snyk.parse_cve_advisory_html(page)
+ expected_file = f"{file}-expected.json"
+ util_tests.check_results_against_json(result, expected_file)