Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added vuln-scan subcommand #65

Merged
merged 17 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/unit-testing.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name: "Unit Testing"
on: [push]
jobs:
test:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v3
- run: python3 -m unittest
2 changes: 1 addition & 1 deletion wordfence/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__all__ = ['licensing', 'exceptions', 'noc4', 'noc1']
__all__ = ['licensing', 'exceptions', 'noc4', 'noc1', 'intelligence']
6 changes: 5 additions & 1 deletion wordfence/api/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,9 @@ def __init__(
internal_message: str,
public_message: Optional[str] = None
):
super().__init__(f'{internal_message}: {public_message}')
if public_message is not None:
message = f'{internal_message}: {public_message}'
else:
message = internal_message
super().__init__(message)
self.public_message = public_message
305 changes: 305 additions & 0 deletions wordfence/api/intelligence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
import requests
from requests.exceptions import RequestException
from enum import Enum
from typing import Callable, Dict, Type, Optional

from .exceptions import ApiException
from ..util.validation import Validator, DictionaryValidator, ListValidator, \
AllowedValueValidator, OptionalValueValidator, NumberValidator, \
ValidationException
from ..intel.vulnerabilities import Vulnerability, ScannerVulnerability, \
ProductionVulnerability, Software, ProductionSoftware, SoftwareType, \
VersionRange, Cwe, Cvss, CopyrightInformation, Copyright


DEFAULT_BASE_URL = 'https://www.wordfence.com/api/intelligence/v2'
DEFAULT_TIMEOUT = 30


def get_base_vulnerability_feed_validator() -> Validator:
return DictionaryValidator(
validator=DictionaryValidator({
'id': str,
'title': str,
'software': ListValidator(DictionaryValidator({
'type': AllowedValueValidator({
'core', 'plugin', 'theme'
}),
'name': str,
'slug': str,
'affected_versions': DictionaryValidator(
validator=DictionaryValidator({
'from_version': str,
'from_inclusive': bool,
'to_version': str,
'to_inclusive': bool
})),
'patched': bool,
'patched_versions': ListValidator(str)
})),
'informational': OptionalValueValidator(bool),
'references': ListValidator(str),
'published': OptionalValueValidator(str),
'copyrights': DictionaryValidator(
expected={
'message': str,
},
validator=DictionaryValidator({
'notice': str,
'license': str,
'license_url': str
}),
allow_empty=True
)
}, optional_keys={'informational'})
)


def get_production_vulnerability_feed_validator() -> Validator:
validator = get_base_vulnerability_feed_validator()
validator.validator.add_field('description', str)
validator.validator.add_field(
'cwe',
OptionalValueValidator(DictionaryValidator({
'id': int,
'name': str,
'description': str
}))
)
validator.validator.add_field(
'cvss',
OptionalValueValidator(DictionaryValidator({
'vector': str,
'score': NumberValidator(),
'rating': str
}))
)
validator.validator.add_field('cve', OptionalValueValidator(str))
validator.validator.add_field('cve_link', OptionalValueValidator(str))
validator.validator.add_field('researchers', ListValidator(str))
validator.validator.add_field('updated', OptionalValueValidator(str))
validator.validator.expected['software'].expected.add_field(
'remediation',
str
)
return validator


class VulnerabilityParser:

def __init__(
self,
type: Type[Vulnerability],
software_type: Type[Software] = Software
):
self.type = type
self.software_type = software_type

def extract_vulnerability_properties(self, record: dict) -> dict:
properties = {}
properties['identifier'] = record['id']
properties['title'] = record['title']
if 'informational' in record:
properties['informational'] = record['informational']
properties['references'] = record['references']
properties['published'] = record['published']
return properties

def parse_version_range(self, record: dict) -> VersionRange:
return VersionRange(
from_version=record['from_version'],
from_inclusive=record['from_inclusive'],
to_version=record['to_version'],
to_inclusive=record['to_inclusive']
)

def extract_software_properties(self, record: dict) -> dict:
properties = {}
properties['type'] = SoftwareType(record['type'])
properties['name'] = record['name']
properties['slug'] = record['slug']
properties['patched'] = record['patched']
properties['patched_versions'] = record['patched_versions']
affected_versions = {}
for key, affected in record['affected_versions'].items():
range = self.parse_version_range(affected)
affected_versions[key] = range
properties['affected_versions'] = affected_versions
return properties

def parse_copyright(self, record: dict) -> Copyright:
return Copyright(
notice=record['notice'],
license=record['license'],
license_url=record['license']
)

def parse_copyright_information(self, record: dict) \
-> Optional[CopyrightInformation]:
copyrights = record['copyrights']
if len(copyrights) > 0:
info = CopyrightInformation()
if 'message' in copyrights:
info.message = copyrights['message']
for key, copyright in copyrights.items():
if key == 'message':
continue
info.copyrights[key] = self.parse_copyright(copyright)
return info
return None

def parse(self, record: dict) -> Vulnerability:
vulnerability = self.type(**self.extract_vulnerability_properties(
record
))
for software in record['software']:
vulnerability.software.append(
self.software_type(**self.extract_software_properties(
software
))
)
vulnerability.copyright_information = self.parse_copyright_information(
record
)
return vulnerability


class ScannerVulnerabilityParser(VulnerabilityParser):

def __init__(self):
super().__init__(
type=ScannerVulnerability
)

def parse(self, record: dict) -> ScannerVulnerability:
return super().parse(record)


class ProductionVulnerabilityParser(VulnerabilityParser):

def __init__(self):
super().__init__(
type=ProductionVulnerability,
software_type=ProductionSoftware
)

def extract_vulnerability_properties(self, record: dict) -> dict:
properties = super().extract_vulnerability_properties(record)
properties['description'] = record['description']
properties['cve'] = record['cve']
properties['cve_link'] = record['cve_link']
properties['researchers'] = record['researchers']
properties['updated'] = record['updated']
return properties

def extract_software_properties(self, record: dict) -> dict:
properties = super().extract_software_properties(record)
properties['remediation'] = record['remediation']
return properties

def parse_cwe(self, record: dict) -> Cwe:
return Cwe(
identifier=record['id'],
name=record['name'],
description=record['description']
)

def parse_cvss(self, record: dict) -> Cvss:
return Cvss(
vector=record['vector'],
score=record['score'],
rating=record['rating']
)

def parse(self, record: dict) -> ProductionVulnerability:
vulnerability = super().parse(record)
if record['cwe'] is not None:
vulnerability.cwe = self.parse_cwe(record['cwe'])
if record['cvss'] is not None:
vulnerability.cvss = self.parse_cvss(record['cvss'])
return vulnerability


class VulnerabilityFeedVariant(Enum):
SCANNER = (
'scanner',
get_base_vulnerability_feed_validator,
ScannerVulnerabilityParser()
)
PRODUCTION = (
'production',
get_production_vulnerability_feed_validator,
ProductionVulnerabilityParser()
)

def __init__(
self,
path: str,
validator_factory: Callable[[], Validator],
parser: VulnerabilityParser
):
self.path = path
self.validator_factory = validator_factory
self.validator = None
self.parser = parser

def get_validator(self) -> Validator:
if self.validator is None:
self.validator = self.validator_factory()
return self.validator

@classmethod
def for_path(cls, path):
for variant in cls:
if variant.path == path:
return variant
raise ValueError(f'Unrecognized vulnerability feed variant: {path}')


class Client:

def __init__(
self,
base_url: Optional[str] = None,
timeout: int = DEFAULT_TIMEOUT
):
self.base_url = base_url if base_url is not None else DEFAULT_BASE_URL
self.timeout = timeout

def _build_url(self, path: str) -> str:
return self.base_url.rstrip('/') + path

def fetch_vulnerability_feed(
self,
variant: VulnerabilityFeedVariant
) -> Dict[str, Vulnerability]:
url = self._build_url(f'/vulnerabilities/{variant.path}')
try:
response = requests.get(url, timeout=self.timeout)
response.raise_for_status()
data = response.json()
variant.get_validator().validate(data)
vulnerabilities = {}
for key, record in data.items():
vulnerabilities[key] = (variant.parser.parse(record))
return vulnerabilities
except RequestException as e:
raise ApiException('Wordfence Intelligence API request failed') \
from e
except ValidationException as e:
raise ApiException(
'Wordfence Intelligence API response validation failed'
) \
from e

def fetch_scanner_vulnerability_feed(
self
) -> Dict[str, ScannerVulnerability]:
return self.fetch_vulnerability_feed(VulnerabilityFeedVariant.SCANNER)

def fetch_production_vulnerability_feed(
self
) -> Dict[str, ProductionVulnerability]:
return self.fetch_vulnerability_feed(
VulnerabilityFeedVariant.PRODUCTION
)
7 changes: 6 additions & 1 deletion wordfence/api/licensing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ def __eq__(self, other):


class LicenseRequiredException(ApiException):
pass

def __init__(self):
super().__init__(
'License required',
'A valid Wordfence CLI license is required'
)


class LicenseSpecific:
Expand Down
Loading