From 30e098267c61a1ffc45d4a96eabb073bfe7297d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Wa=C5=9B?= Date: Sun, 22 Mar 2020 11:48:31 +0100 Subject: [PATCH] describe class attributes --- Makefile | 2 +- README.md | 28 ++++---- pytm/pytm.py | 177 ++++++++++++++++++++++++++++++++++----------------- 3 files changed, 133 insertions(+), 74 deletions(-) diff --git a/Makefile b/Makefile index ecb3e66..a7b0749 100644 --- a/Makefile +++ b/Makefile @@ -58,7 +58,7 @@ build: setup.py .PHONY: describe describe: - for i in TM Element Server ExternalEntity Datastore Actor Process SetOfProcesses Dataflow Boundary Lambda Finding; do ./tm.py --describe $$i; done + ./tm.py --describe "TM Element Boundary ExternalEntity Actor Lambda Server Process SetOfProcesses Datastore Dataflow" .PHONY: image image: diff --git a/README.md b/README.md index 581e221..8e155e1 100644 --- a/README.md +++ b/README.md @@ -36,21 +36,19 @@ The available properties of an element can be listed by using `--describe` follo ```text (pytm) ➜ pytm git:(master) ✗ ./tm.py --describe Element -Element - OS - check - definesConnectionTimeout - description - dfd - handlesResources - implementsAuthenticationScheme - implementsNonce - inBoundary - inScope - isAdmin - isHardened - name - onAWS +Element class attributes: + OS + definesConnectionTimeout default: False + description + handlesResources default: False + implementsAuthenticationScheme default: False + implementsNonce default: False + inBoundary + inScope Is the element in scope of the threat model, default: True + isAdmin default: False + isHardened default: False + name required + onAWS default: False ``` diff --git a/pytm/pytm.py b/pytm/pytm.py index 992b3cc..aeb0b7b 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -9,8 +9,6 @@ from collections.abc import Iterable from hashlib import sha224 from os.path import dirname -from re import match -from sys import exit, stderr from textwrap import wrap from weakref import WeakKeyDictionary @@ -28,8 +26,11 @@ class var(object): ''' A descriptor that allows setting a value only once ''' - def __init__(self, default, onSet=None): + + def __init__(self, default, required=False, doc="", onSet=None): self.default = default + self.required = required + self.doc = doc self.data = WeakKeyDictionary() self.onSet = onSet @@ -37,6 +38,8 @@ def __get__(self, instance, owner): # when x.d is called we get here # instance = x # owner = type(x) + if instance is None: + return self return self.data.get(instance, self.default) def __set__(self, instance, value): @@ -45,16 +48,17 @@ def __set__(self, instance, value): # value = val if instance in self.data: raise ValueError( - "cannot overwrite {} value with {}, already set to {}".format( - self.__class__.__name__, value, self.data[instance] - ) - ) + "cannot overwrite {} value with {}, already set to {}".format( + self.__class__.__name__, value, self.data[instance] + ) + ) self.data[instance] = value if self.onSet is not None: self.onSet(instance, value) class varString(var): + def __set__(self, instance, value): if not isinstance(value, str): raise ValueError("expecting a String value, got a {}".format(type(value))) @@ -62,6 +66,7 @@ def __set__(self, instance, value): class varBoundary(var): + def __set__(self, instance, value): if not isinstance(value, Boundary): raise ValueError("expecting a Boundary value, got a {}".format(type(value))) @@ -69,6 +74,7 @@ def __set__(self, instance, value): class varBool(var): + def __set__(self, instance, value): if not isinstance(value, bool): raise ValueError("expecting a boolean value, got a {}".format(type(value))) @@ -76,6 +82,7 @@ def __set__(self, instance, value): class varInt(var): + def __set__(self, instance, value): if not isinstance(value, int): raise ValueError("expecting an integer value, got a {}".format(type(value))) @@ -83,6 +90,7 @@ def __set__(self, instance, value): class varElement(var): + def __set__(self, instance, value): if not isinstance(value, Element): raise ValueError("expecting an Element (or inherited) " @@ -155,13 +163,42 @@ def _applyDefaults(elements): e._safeset("isEncrypted", e.sink.isEncrypted) +def _describe_classes(classes): + for name in classes: + klass = getattr(sys.modules[__name__], name, None) + if klass is None: + logger.error("No such class to describe: %s\n", name) + sys.exit(1) + print("{} class attributes:".format(name)) + attrs = [] + for i in dir(klass): + if i.startswith("_") or callable(getattr(klass, i)): + continue + attrs.append(i) + longest = len(max(attrs, key=len)) + 2 + for i in attrs: + attr = getattr(klass, i, {}) + docs = [] + if isinstance(attr, var): + if attr.doc: + docs.append(attr.doc) + if attr.required: + docs.append("required") + if attr.default or isinstance(attr.default, bool): + docs.append("default: {}".format(attr.default)) + print(" {}{}".format(i.ljust(longest, " "), ", ".join(docs))) + print() + + ''' End of help functions ''' class Threat(): - id = varString("") + """Represents a possible threat""" + + id = varString("", required=True) description = varString("") - condition = varString("") + condition = varString("", doc="a Python expression that should evaluate to a boolean True or False") details = varString("") severity = varString("") mitigations = varString("") @@ -169,7 +206,6 @@ class Threat(): references = varString("") target = () - ''' Represents a possible threat ''' def __init__(self, json_read): self.id = json_read['SID'] self.description = json_read['description'] @@ -202,9 +238,21 @@ def apply(self, target): class Finding(): - ''' This class represents a Finding - the element in question and a description of the finding ''' + """Represents a Finding - the element in question and a description of the finding """ + + element = varElement(None, required=True, doc="Element this finding applies to") + target = varString("", doc="Name of the element this finding applies to") + description = varString("", required=True, doc="Threat description") + details = varString("", required=True, doc="Threat details") + severity = varString("", required=True, doc="Threat severity") + mitigations = varString("", required=True, doc="Threat mitigations") + example = varString("", required=True, doc="Threat example") + id = varString("", required=True, doc="Threat ID") + references = varString("", required=True, doc="Threat references") + def __init__(self, element, description, details, severity, mitigations, example, id, references): - self.target = element + self.target = element.name + self.element = element self.description = description self.details = details self.severity = severity @@ -215,7 +263,8 @@ def __init__(self, element, description, details, severity, mitigations, example class TM(): - ''' Describes the threat model administratively, and holds all details during a run ''' + """Describes the threat model administratively, and holds all details during a run""" + _BagOfFlows = [] _BagOfElements = [] _BagOfThreats = [] @@ -223,11 +272,13 @@ class TM(): _BagOfBoundaries = [] _threatsExcluded = [] _sf = None - description = varString("") + name = varString("", required=True, doc="Model name") + description = varString("", required=True, doc="Model description") threatsFile = varString(dirname(__file__) + "/threatlib/threats.json", - onSet=lambda i, v: i._init_threats()) - isOrdered = varBool(False) - mergeResponses = varBool(False) + onSet=lambda i, v: i._init_threats(), + doc="JSON file with custom threats") + isOrdered = varBool(False, doc="Automatically order all Dataflows") + mergeResponses = varBool(False, doc="Merge response edges in DFDs") def __init__(self, name, **kwargs): for key, value in kwargs.items(): @@ -260,7 +311,7 @@ def resolve(self): if e.inScope is True: for t in TM._BagOfThreats: if t.apply(e) is True: - TM._BagOfFindings.append(Finding(e.name, t.description, t.details, t.severity, t.mitigations, t.example, t.id, t.references)) + TM._BagOfFindings.append(Finding(e, t.description, t.details, t.severity, t.mitigations, t.example, t.id, t.references)) def check(self): if self.description is None: @@ -328,26 +379,21 @@ def process(self): if result.exclude is not None: TM._threatsExcluded = result.exclude.split(",") if result.describe is not None: - try: - one_word = result.describe.split()[0] - c = eval(one_word) - except Exception: - stderr.write("No such class to describe: {}\n".format(result.describe)) - exit(-1) - print("The following properties are available for " + result.describe) - [print("\t{}".format(i)) for i in dir(c) if not callable(i) and match("__", i) is None] + _describe_classes(result.describe.split()) if result.list is True: [print("{} - {}".format(t.id, t.description)) for t in TM._BagOfThreats] - exit(0) + sys.exit(0) class Element(): - name = varString("") + """A generic element""" + + name = varString("", required=True) description = varString("") - inBoundary = varBoundary(None) + inBoundary = varBoundary(None, doc="Trust boundary this element exists in") + inScope = varBool(True, doc="Is the element in scope of the threat model") onAWS = varBool(False) isHardened = varBool(False) - inScope = varBool(True) implementsAuthenticationScheme = varBool(False) implementsNonce = varBool(False) handlesResources = varBool(False) @@ -446,6 +492,11 @@ def inside(self, *boundaries): class Lambda(Element): + """A lambda function running in a Function-as-a-Service (FaaS) environment""" + + port = varInt(-1, doc="Default TCP port for outgoing data flows") + protocol = varString("", doc="Default network protocol for outgoing data flows") + data = varString("", doc="Default type of data in outgoing data flows") onAWS = varBool(True) authenticatesSource = varBool(False) hasAccessControl = varBool(False) @@ -459,7 +510,6 @@ class Lambda(Element): environment = varString("") implementsAPI = varBool(False) authorizesSource = varBool(False) - data = varString("") def __init__(self, name, **kwargs): super().__init__(name, **kwargs) @@ -475,10 +525,12 @@ def dfd(self, **kwargs): class Server(Element): - port = varInt(-1) - isEncrypted = varBool(False) - protocol = varString("") - data = varString("") + """An entity processing data""" + + port = varInt(-1, doc="Default TCP port for incoming data flows") + isEncrypted = varBool(False, doc="Requires incoming data flow to be encrypted") + protocol = varString("", doc="Default network protocol for incoming data flows") + data = varString("", doc="Default type of data in incoming data flows") providesConfidentiality = varBool(False) providesIntegrity = varBool(False) authenticatesSource = varBool(False) @@ -528,10 +580,12 @@ def __init__(self, name, **kwargs): class Datastore(Element): - port = varInt(-1) - isEncrypted = varBool(False) - protocol = varString("") - data = varString("") + """An entity storing data""" + + port = varInt(-1, doc="Default TCP port for incoming data flows") + isEncrypted = varBool(False, doc="Requires incoming data flow to be encrypted") + protocol = varString("", doc="Default network protocol for incoming data flows") + data = varString("", doc="Default type of data in incoming data flows") onRDS = varBool(False) storesLogData = varBool(False) storesPII = varBool(False) @@ -566,9 +620,11 @@ def dfd(self, **kwargs): class Actor(Element): - port = varInt(-1) - protocol = varString("") - data = varString("") + """An entity usually initiating actions""" + + port = varInt(-1, doc="Default TCP port for outgoing data flows") + protocol = varString("", doc="Default network protocol for outgoing data flows") + data = varString("", doc="Default type of data in outgoing data flows") def __init__(self, name, **kwargs): super().__init__(name, **kwargs) @@ -582,10 +638,12 @@ def dfd(self, **kwargs): class Process(Element): - port = varInt(-1) - isEncrypted = varBool(False) - protocol = varString("") - data = varString("") + """An entity processing data""" + + port = varInt(-1, doc="Default TCP port for incoming data flows") + isEncrypted = varBool(False, doc="Requires incoming data flow to be encrypted") + protocol = varString("", doc="Default network protocol for incoming data flows") + data = varString("", doc="Default type of data in incoming data flows") codeType = varString("Unmanaged") implementsCommunicationProtocol = varBool(False) providesConfidentiality = varBool(False) @@ -645,20 +703,21 @@ def dfd(self, **kwargs): class Dataflow(Element): - source = varElement(None) - sink = varElement(None) - isResponse = varBool(False) - response = varElement(None) - responseTo = varElement(None) - srcPort = varInt(-1) - dstPort = varInt(-1) - isEncrypted = varBool(False) - protocol = varString("") - data = varString("") + """A data flow from a source to a sink""" + + source = varElement(None, required=True) + sink = varElement(None, required=True) + isResponse = varBool(False, doc="Is a response to another data flow") + response = varElement(None, doc="Another data flow that is a response to this one") + responseTo = varElement(None, doc="Is a response to this data flow") + srcPort = varInt(-1, doc="Source TCP port") + dstPort = varInt(-1, doc="Destination TCP port") + isEncrypted = varBool(False, doc="Is the data encrypted") + protocol = varString("", doc="Protocol used in this data flow") + data = varString("", "Type of data carried in this data flow") authenticatedWith = varBool(False) - order = varInt(-1) + order = varInt(-1, doc="Number of this data flow in the threat model") implementsCommunicationProtocol = varBool(False) - name = varString("") note = varString("") usesVPN = varBool(False) authorizesSource = varBool(False) @@ -704,6 +763,8 @@ def dfd(self, mergeResponses=False, **kwargs): class Boundary(Element): + """Trust boundary""" + def __init__(self, name, **kwargs): super().__init__(name, **kwargs) if name not in TM._BagOfBoundaries: