Skip to content

Commit

Permalink
back to enum
Browse files Browse the repository at this point in the history
  • Loading branch information
Vlad Monea committed Sep 7, 2022
1 parent df1bad9 commit 5a3e28f
Showing 1 changed file with 32 additions and 34 deletions.
66 changes: 32 additions & 34 deletions hubblestack/utils/signing.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import re
import io as cStringIO
from binascii import b2a_base64, a2b_base64
from dataclasses import dataclass
from enum import Enum

import hubblestack.utils.platform
Expand Down Expand Up @@ -156,8 +155,7 @@ def check_verif_timestamp(target, dampener_limit=None):
return False


@dataclass
class STATUS:
class STATUS(Enum):
"""
Status code strings
"""
Expand Down Expand Up @@ -232,7 +230,7 @@ def split_certs(fh):
try:
yield ossl.load_certificate(ossl.FILETYPE_PEM, ret)
except Exception:
status = STATUS.UNKNOWN
status = STATUS.UNKNOWN.value
if check_verif_timestamp(fh):
log_level = log.warning
log_level(
Expand Down Expand Up @@ -299,11 +297,11 @@ class X509AwareCertBucket:
public_crt = tuple()

def authenticate_cert(self):
if any(i.status == STATUS.FAIL for i in self.public_crt):
return STATUS.FAIL
if all(i.status == STATUS.VERIFIED for i in self.public_crt):
return STATUS.VERIFIED
return STATUS.UNKNOWN
if any(i.status == STATUS.FAIL.value for i in self.public_crt):
return STATUS.FAIL.value
if all(i.status == STATUS.VERIFIED.value for i in self.public_crt):
return STATUS.VERIFIED.value
return STATUS.UNKNOWN.value

def __init__(self, public_crt=None, ca_crt=None, extra_crt=None):
try:
Expand Down Expand Up @@ -370,7 +368,7 @@ def __init__(self, public_crt=None, ca_crt=None, extra_crt=None):
log_level = log.debug
if check_verif_timestamp(digest, dampener_limit=seconds_day):
log_level = log.splunk
status = STATUS.VERIFIED
status = STATUS.VERIFIED.value
str_ca = stringify_cert_files(ca_crt)
log_level(
'ca cert | file: "%s" | status: %s | digest "%s" | added to verify store', str_ca, status, digest
Expand All @@ -387,21 +385,21 @@ def __init__(self, public_crt=None, ca_crt=None, extra_crt=None):
ossl.X509StoreContext(self.store, i).verify_certificate()
self.store.add_cert(i) # add to trusted keyring
self.trusted.append(digest)
status = STATUS.VERIFIED
status = STATUS.VERIFIED.value
except ossl.X509StoreContextError as exception_object:
status = STATUS.FAIL
status = STATUS.FAIL.value
if check_verif_timestamp(digest, dampener_limit=seconds_day):
log_level = log.splunk
if status == STATUS.FAIL:
if status == STATUS.FAIL.value:
log_level = log.error
elif status == STATUS.UNKNOWN:
elif status == STATUS.UNKNOWN.value:
log_level = log.error
str_untrusted = stringify_cert_files(untrusted_crt)
log_level('intermediate certs | file: "%s" | status: %s | digest "%s"', str_untrusted, status, digest)

self.public_crt = list()
for i in public_crt:
status = STATUS.FAIL
status = STATUS.FAIL.value
digest = i.digest("sha1")
if digest in already:
continue
Expand All @@ -410,13 +408,13 @@ def __init__(self, public_crt=None, ca_crt=None, extra_crt=None):
log_level = log.debug
try:
ossl.X509StoreContext(self.store, i).verify_certificate()
status = STATUS.VERIFIED
status = STATUS.VERIFIED.value
self.trusted.append(digest)
if check_verif_timestamp(digest, dampener_limit=seconds_day):
log_level = log.splunk
if status == STATUS.FAIL:
if status == STATUS.FAIL.value:
log_level = log.error
elif status == STATUS.UNKNOWN:
elif status == STATUS.UNKNOWN.value:
log_level = log.error
str_public = stringify_cert_files(public_crt)
log_level('public cert | file: "%s" | status : "%s" | digest: "%s"', str_public, status, digest)
Expand All @@ -430,16 +428,16 @@ def __init__(self, public_crt=None, ca_crt=None, extra_crt=None):
# X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY 20
# X509_V_ERR_CERT_UNTRUSTED 27
# X509_V_ERR_UNABLE_TO_GET_CRL_ISSUER 33
status = STATUS.UNKNOWN
status = STATUS.UNKNOWN.value
if code in (10, 12):
# X509_V_ERR_CERT_HAS_EXPIRED 10
# X509_V_ERR_CRL_HAS_EXPIRED 12
status = STATUS.FAIL
status = STATUS.FAIL.value
if check_verif_timestamp(digest, dampener_limit=seconds_day):
log_level = log.splunk
if status == STATUS.FAIL:
if status == STATUS.FAIL.value:
log_level = log.error
elif status == STATUS.UNKNOWN:
elif status == STATUS.UNKNOWN.value:
log_level = log.error
str_public = stringify_cert_files(public_crt)
log_level(
Expand Down Expand Up @@ -700,7 +698,7 @@ def verify_signature(
ca_crt = Options.ca_crt

if not (fname and sfname):
status = STATUS.UNKNOWN
status = STATUS.UNKNOWN.value
log_info("!(fname=%s and sfname=%s) => status=%s", fname, sfname, status)
return status

Expand All @@ -710,7 +708,7 @@ def verify_signature(
with open(sfname, "r") as fh:
sig = decode_pem(fh.read())
except IOError:
status = STATUS.UNKNOWN
status = STATUS.UNKNOWN.value
log_error('%s | file "%s" | status: %s ', short_fname, fname, status)
return status

Expand Down Expand Up @@ -765,7 +763,7 @@ def verify_signature(
sha256sum,
txt,
)
status = STATUS.FAIL
status = STATUS.FAIL.value
log_error(
'verify_signature(%s, %s) UnverifiedSignature | status: %s | sha256sum: "%s" | (4)',
fname,
Expand Down Expand Up @@ -855,7 +853,7 @@ def verify_files(targets, mfname=None, sfname=None, public_crt=None, ca_crt=None
xlate[target] = otarget
if target in digests or target in (mfname, sfname):
continue
digests[target] = STATUS.UNKNOWN
digests[target] = STATUS.UNKNOWN.value
# populate digests with the hashes from the MANIFEST
if os.path.isfile(mfname):
with open(mfname, "r") as fh:
Expand All @@ -875,11 +873,11 @@ def verify_files(targets, mfname=None, sfname=None, public_crt=None, ca_crt=None
new_hash = hash_target(htname)

log_level = log.debug
if digest == STATUS.UNKNOWN:
if digest == STATUS.UNKNOWN.value:
# digests[vfname] is either UNKNOWN (from the targets population)
# or it's a digest from the MANIFEST. If UNKNOWN, we have nothing to compare
# so we return UNKNOWN
status = STATUS.UNKNOWN
status = STATUS.UNKNOWN.value
# check to see if the the status of a failed target has been sent is the last
# x seconds, we reset time and set log level accordingly. the same for FAIL
elif digest == new_hash:
Expand All @@ -892,11 +890,11 @@ def verify_files(targets, mfname=None, sfname=None, public_crt=None, ca_crt=None
else:
# We do have a MANIFEST entry and it doesn't match: FAIL with or
# without a matching SIGNATURE
status = STATUS.FAIL
status = STATUS.FAIL.value
if check_verif_timestamp(digest):
if status == STATUS.FAIL:
if status == STATUS.FAIL.value:
log_level = log.error
elif status == STATUS.UNKNOWN:
elif status == STATUS.UNKNOWN.value:
log_level = log.error
# logs according to the STATUS of target file
log_level(
Expand Down Expand Up @@ -964,10 +962,10 @@ def inner(path, saltenv, *a, **kwargs):

log.debug("verify: %s", dict(**verify_res))

vrg = verify_res.get(normalized_path, STATUS.UNKNOWN)
if vrg == STATUS.VERIFIED:
vrg = verify_res.get(normalized_path, STATUS.UNKNOWN.value)
if vrg == STATUS.VERIFIED.value:
return file_path
if vrg == STATUS.UNKNOWN and not Options.require_verify:
if vrg == STATUS.UNKNOWN.value and not Options.require_verify:
return file_path
log.debug("claiming not found: %s (%s)", path, file_path)
if log.isEnabledFor(logging.DEBUG):
Expand Down

0 comments on commit 5a3e28f

Please sign in to comment.