Skip to content

Commit

Permalink
describe class attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Waś committed Mar 22, 2020
1 parent 4a25881 commit c851713
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 55 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ build: setup.py

.PHONY: describe
describe:
for i in TM Element Server ExternalEntity Datastore Actor Process SetOfProcesses Dataflow Boundary Lambda Finding; do ./tm.py --describe $$i; done
./tm.py --describe "TM Element Server ExternalEntity Datastore Actor Process SetOfProcesses Dataflow Boundary Lambda Finding"

.PHONY: image
image:
Expand Down
150 changes: 96 additions & 54 deletions pytm/pytm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from collections.abc import Iterable
from hashlib import sha224
from os.path import dirname
from re import match
from sys import exit, stderr
from textwrap import wrap
from weakref import WeakKeyDictionary

Expand All @@ -28,15 +26,20 @@

class var(object):
''' A descriptor that allows setting a value only once '''
def __init__(self, default, onSet=None):

def __init__(self, default, required=False, doc="", onSet=None):
self.default = default
self.required = required
self.doc = doc
self.data = WeakKeyDictionary()
self.onSet = onSet

def __get__(self, instance, owner):
# when x.d is called we get here
# instance = x
# owner = type(x)
if instance is None:
return self
return self.data.get(instance, self.default)

def __set__(self, instance, value):
Expand All @@ -45,44 +48,49 @@ def __set__(self, instance, value):
# value = val
if instance in self.data:
raise ValueError(
"cannot overwrite {} value with {}, already set to {}".format(
self.__class__.__name__, value, self.data[instance]
)
)
"cannot overwrite {} value with {}, already set to {}".format(
self.__class__.__name__, value, self.data[instance]
)
)
self.data[instance] = value
if self.onSet is not None:
self.onSet(instance, value)


class varString(var):

def __set__(self, instance, value):
if not isinstance(value, str):
raise ValueError("expecting a String value, got a {}".format(type(value)))
super().__set__(instance, value)


class varBoundary(var):

def __set__(self, instance, value):
if not isinstance(value, Boundary):
raise ValueError("expecting a Boundary value, got a {}".format(type(value)))
super().__set__(instance, value)


class varBool(var):

def __set__(self, instance, value):
if not isinstance(value, bool):
raise ValueError("expecting a boolean value, got a {}".format(type(value)))
super().__set__(instance, value)


class varInt(var):

def __set__(self, instance, value):
if not isinstance(value, int):
raise ValueError("expecting an integer value, got a {}".format(type(value)))
super().__set__(instance, value)


class varElement(var):

def __set__(self, instance, value):
if not isinstance(value, Element):
raise ValueError("expecting an Element (or inherited) "
Expand Down Expand Up @@ -155,13 +163,40 @@ def _applyDefaults(elements):
e._safeset("isEncrypted", e.sink.isEncrypted)


def _describe_classes(classes):
for name in classes:
klass = getattr(sys.modules[__name__], name, None)
if klass is None:
logger.error("No such class to describe: %s\n", name)
sys.exit(1)
print("{} class attributes:".format(name))
attrs = []
for i in dir(klass):
if i.startswith("_") or callable(getattr(klass, i)):
continue
attrs.append(i)
longest = len(max(attrs, key=len)) + 2
for i in attrs:
attr = getattr(klass, i, {})
docs = []
if isinstance(attr, var):
if attr.doc:
docs.append(attr.doc)
if attr.required:
docs.append("required")
if attr.default or isinstance(attr.default, bool):
docs.append("default: {}".format(attr.default))
print(" {}{}".format(i.ljust(longest, " "), ", ".join(docs)))
print()


''' End of help functions '''


class Threat():
id = varString("")
id = varString("", required=True)
description = varString("")
condition = varString("")
condition = varString("", doc="a Python expression that should evaluate to a boolean True or False")
details = varString("")
severity = varString("")
mitigations = varString("")
Expand Down Expand Up @@ -202,9 +237,20 @@ def apply(self, target):


class Finding():
element = varElement(None, required=True, doc="Element this finding applies to")
target = varString("", doc="Name of the element this finding applies to")
description = varString("", required=True, doc="Threat description")
details = varString("", required=True, doc="Threat details")
severity = varString("", required=True, doc="Threat severity")
mitigations = varString("", required=True, doc="Threat mitigations")
example = varString("", required=True, doc="Threat example")
id = varString("", required=True, doc="Threat ID")
references = varString("", required=True, doc="Threat references")

''' This class represents a Finding - the element in question and a description of the finding '''
def __init__(self, element, description, details, severity, mitigations, example, id, references):
self.target = element
self.target = element.name
self.element = element
self.description = description
self.details = details
self.severity = severity
Expand All @@ -223,11 +269,13 @@ class TM():
_BagOfBoundaries = []
_threatsExcluded = []
_sf = None
description = varString("")
name = varString("", required=True, doc="Model name")
description = varString("", required=True, doc="Model description")
threatsFile = varString(dirname(__file__) + "/threatlib/threats.json",
onSet=lambda i, v: i._init_threats())
isOrdered = varBool(False)
mergeResponses = varBool(False)
onSet=lambda i, v: i._init_threats(),
doc="JSON file with custom threats")
isOrdered = varBool(False, doc="Automatically order all Dataflows")
mergeResponses = varBool(False, doc="Merge response edges in DFDs")

def __init__(self, name, **kwargs):
for key, value in kwargs.items():
Expand Down Expand Up @@ -260,7 +308,7 @@ def resolve(self):
if e.inScope is True:
for t in TM._BagOfThreats:
if t.apply(e) is True:
TM._BagOfFindings.append(Finding(e.name, t.description, t.details, t.severity, t.mitigations, t.example, t.id, t.references))
TM._BagOfFindings.append(Finding(e, t.description, t.details, t.severity, t.mitigations, t.example, t.id, t.references))

def check(self):
if self.description is None:
Expand Down Expand Up @@ -328,26 +376,19 @@ def process(self):
if result.exclude is not None:
TM._threatsExcluded = result.exclude.split(",")
if result.describe is not None:
try:
one_word = result.describe.split()[0]
c = eval(one_word)
except Exception:
stderr.write("No such class to describe: {}\n".format(result.describe))
exit(-1)
print("The following properties are available for " + result.describe)
[print("\t{}".format(i)) for i in dir(c) if not callable(i) and match("__", i) is None]
_describe_classes(result.describe.split())
if result.list is True:
[print("{} - {}".format(t.id, t.description)) for t in TM._BagOfThreats]
exit(0)
sys.exit(0)


class Element():
name = varString("")
name = varString("", required=True)
description = varString("")
inBoundary = varBoundary(None)
inScope = varBool(True, doc="Is the element in scope of the threat model")
onAWS = varBool(False)
isHardened = varBool(False)
inScope = varBool(True)
implementsAuthenticationScheme = varBool(False)
implementsNonce = varBool(False)
handlesResources = varBool(False)
Expand Down Expand Up @@ -446,6 +487,9 @@ def inside(self, *boundaries):


class Lambda(Element):
port = varInt(-1, doc="Default TCP port for outgoing data flows")
protocol = varString("", doc="Default network protocol for outgoing data flows")
data = varString("", doc="Default type of data in outgoing data flows")
onAWS = varBool(True)
authenticatesSource = varBool(False)
hasAccessControl = varBool(False)
Expand All @@ -459,7 +503,6 @@ class Lambda(Element):
environment = varString("")
implementsAPI = varBool(False)
authorizesSource = varBool(False)
data = varString("")

def __init__(self, name, **kwargs):
super().__init__(name, **kwargs)
Expand All @@ -475,10 +518,10 @@ def dfd(self, **kwargs):


class Server(Element):
port = varInt(-1)
isEncrypted = varBool(False)
protocol = varString("")
data = varString("")
port = varInt(-1, doc="Default TCP port for incoming data flows")
isEncrypted = varBool(False, doc="Requires incoming data flow to be encrypted")
protocol = varString("", doc="Default network protocol for incoming data flows")
data = varString("", doc="Default type of data in incoming data flows")
providesConfidentiality = varBool(False)
providesIntegrity = varBool(False)
authenticatesSource = varBool(False)
Expand Down Expand Up @@ -528,10 +571,10 @@ def __init__(self, name, **kwargs):


class Datastore(Element):
port = varInt(-1)
isEncrypted = varBool(False)
protocol = varString("")
data = varString("")
port = varInt(-1, doc="Default TCP port for incoming data flows")
isEncrypted = varBool(False, doc="Requires incoming data flow to be encrypted")
protocol = varString("", doc="Default network protocol for incoming data flows")
data = varString("", doc="Default type of data in incoming data flows")
onRDS = varBool(False)
storesLogData = varBool(False)
storesPII = varBool(False)
Expand Down Expand Up @@ -566,9 +609,9 @@ def dfd(self, **kwargs):


class Actor(Element):
port = varInt(-1)
protocol = varString("")
data = varString("")
port = varInt(-1, doc="Default TCP port for outgoing data flows")
protocol = varString("", doc="Default network protocol for outgoing data flows")
data = varString("", doc="Default type of data in outgoing data flows")

def __init__(self, name, **kwargs):
super().__init__(name, **kwargs)
Expand All @@ -582,10 +625,10 @@ def dfd(self, **kwargs):


class Process(Element):
port = varInt(-1)
isEncrypted = varBool(False)
protocol = varString("")
data = varString("")
port = varInt(-1, doc="Default TCP port for incoming data flows")
isEncrypted = varBool(False, doc="Requires incoming data flow to be encrypted")
protocol = varString("", doc="Default network protocol for incoming data flows")
data = varString("", doc="Default type of data in incoming data flows")
codeType = varString("Unmanaged")
implementsCommunicationProtocol = varBool(False)
providesConfidentiality = varBool(False)
Expand Down Expand Up @@ -645,20 +688,19 @@ def dfd(self, **kwargs):


class Dataflow(Element):
source = varElement(None)
sink = varElement(None)
isResponse = varBool(False)
response = varElement(None)
responseTo = varElement(None)
srcPort = varInt(-1)
dstPort = varInt(-1)
isEncrypted = varBool(False)
protocol = varString("")
data = varString("")
source = varElement(None, required=True)
sink = varElement(None, required=True)
isResponse = varBool(False, doc="Is a response to another data flow")
response = varElement(None, doc="Another data flow that is a response to this one")
responseTo = varElement(None, doc="Is a response to this data flow")
srcPort = varInt(-1, doc="Source TCP port")
dstPort = varInt(-1, doc="Destination TCP port")
isEncrypted = varBool(False, doc="Is the data encrypted")
protocol = varString("", doc="Protocol used in this data flow")
data = varString("", "Type of data carried in this data flow")
authenticatedWith = varBool(False)
order = varInt(-1)
order = varInt(-1, doc="Number of this data flow in the threat model")
implementsCommunicationProtocol = varBool(False)
name = varString("")
note = varString("")
usesVPN = varBool(False)
authorizesSource = varBool(False)
Expand Down

0 comments on commit c851713

Please sign in to comment.