Skip to content
This repository has been archived by the owner on Mar 24, 2023. It is now read-only.

Added dkim and dmarc to dns boefje #21

Merged
merged 16 commits into from
Jan 31, 2023
Merged
25 changes: 23 additions & 2 deletions boefjes/plugins/kat_dns/main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Boefje script for getting dns records"""
import json
import logging
from typing import Union, Tuple, List

import dns.resolver
from dns.name import Name
from dns.resolver import Answer

from boefjes.config import settings
from boefjes.job_models import BoefjeMeta

logger = logging.getLogger(__name__)
Expand All @@ -30,7 +32,9 @@ def run(boefje_meta: BoefjeMeta) -> List[Tuple[set, Union[bytes, str]]]:
for type_ in dns_record_types:

try:
answer: Answer = dns.resolver.resolve(hostname, type_)
resolver = dns.resolver.Resolver()
resolver.nameservers = [settings.remote_ns]
answer: Answer = resolver.resolve(hostname, type_)
answers.append(answer)
except dns.resolver.NoAnswer:
pass
Expand All @@ -41,7 +45,12 @@ def run(boefje_meta: BoefjeMeta) -> List[Tuple[set, Union[bytes, str]]]:

answers_formatted = [f"RESOLVER: {answer.nameserver}\n{answer.response}" for answer in answers]

return [(set(), "\n\n".join(answers_formatted))]
results = {
"dns_records": "\n\n".join(answers_formatted),
"dmarc_response": get_email_security_records(hostname, "_dmarc"),
"dkim_response": get_email_security_records(hostname, "_domainkey"),
}
return [(set(), json.dumps(results))]


def get_parent_zone_soa(name: Name) -> Answer:
Expand All @@ -56,3 +65,15 @@ def get_parent_zone_soa(name: Name) -> Answer:
name = name.parent()
except dns.name.NoParent:
raise ZoneNotFoundException


def get_email_security_records(hostname: str, record_subdomain: str) -> str:
try:
resolver = dns.resolver.Resolver()
resolver.nameservers = [settings.remote_ns]
answer = resolver.resolve(f"{record_subdomain}.{hostname}", "TXT", raise_on_no_answer=False)
return answer.response.to_text()
except dns.resolver.NXDOMAIN:
return "NXDOMAIN"
except dns.resolver.Timeout:
return "Timeout"
27 changes: 26 additions & 1 deletion boefjes/plugins/kat_dns/normalize.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from ipaddress import IPv4Address, IPv6Address
from typing import Iterator, Union, List, Dict

Expand All @@ -24,6 +25,7 @@
)
from octopoes.models.ooi.dns.zone import Hostname, DNSZone
from octopoes.models.ooi.network import IPAddressV4, IPAddressV6, Network
from octopoes.models.ooi.email_security import DKIMExists, DMARCTXTRecord

from boefjes.job_models import NormalizerMeta

Expand All @@ -35,8 +37,10 @@ def run(normalizer_meta: NormalizerMeta, raw: Union[bytes, str]) -> Iterator[OOI
yield NXDOMAIN(hostname=Reference.from_str(normalizer_meta.raw_data.boefje_meta.input_ooi))
return

results = json.loads(raw)

# parse raw data into dns.response.Message
sections = raw.decode().split("\n\n")
sections = results["dns_records"].split("\n\n")
responses: List[Message] = []
for section in sections:
lines = section.split("\n")
Expand Down Expand Up @@ -155,3 +159,24 @@ def register_record(record: DNSRecord) -> DNSRecord:
yield zone
yield from hostname_store.values()
yield from record_store.values()

# DKIM
dkim_results = results["dkim_response"]
if dkim_results not in ["NXDOMAIN", "Timeout"]:
if "rcode NOERROR" == dkim_results.split("\n")[2]:
yield DKIMExists(
hostname=input_hostname.reference,
)

# DMARC
dmarc_results = results["dmarc_response"]
if dmarc_results not in ["NXDOMAIN", "Timeout"]:
for rrset in from_text(dmarc_results).answer:
for rr in rrset:
rr: Rdata
underdarknl marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(rr, TXT):
yield DMARCTXTRecord(
hostname=input_hostname.fqdn,
value=str(rr).strip('"'),
ttl=rrset.ttl,
)