Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Provides usb preflight checks and standardizes error messages #5

Merged
merged 7 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions securedrop_export/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@
from securedrop_export import export
from securedrop_export import main

CONFIG_PATH = "/etc/sd-export-config.json"


def start():
my_sub = export.SDExport(sys.argv[1])
my_sub = export.SDExport(sys.argv[1], CONFIG_PATH)
try:
# Halt immediately if target file is absent
if not os.path.exists(my_sub.archive):
msg = "File does not exist"
msg = "ERROR_FILE_NOT_FOUND"
my_sub.exit_gracefully(msg)
main.__main__(my_sub)
# Delete extracted achive from tempfile
shutil.rmtree(my_sub.tmpdir)
except Exception as e:
except Exception:
# exit with 0 return code otherwise the os will attempt to open
# the file with another application
msg = "Unhandled exception:"
my_sub.exit_gracefully(msg, e=e)
msg = "ERROR_GENERIC"
my_sub.exit_gracefully(msg)
200 changes: 123 additions & 77 deletions securedrop_export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,26 @@ class Metadata(object):
"""

METADATA_FILE = "metadata.json"
SUPPORTED_EXPORT_METHODS = ["disk", "printer", "printer-test"]
SUPPORTED_EXPORT_METHODS = [
"usb-test", # general preflight check
"disk",
"disk-test", # disk preflight test
"printer",
"printer-test", # print test page
]
SUPPORTED_ENCRYPTION_METHODS = ["luks"]

def __init__(self, archive_path):
self.metadata_path = os.path.join(archive_path, self.METADATA_FILE)

try:
with open(self.metadata_path) as f:
json_config = json.loads(f.read())
self.export_method = json_config.get("device", None)
self.encryption_method = json_config.get("encryption_method", None)
self.encryption_key = json_config.get("encryption_key", None)
except Exception as e:

except Exception:
raise

def is_valid(self):
Expand All @@ -51,8 +59,7 @@ def is_valid(self):


class SDExport(object):

def __init__(self, archive):
def __init__(self, archive, config_path):
self.device = DEVICE
self.mountpoint = MOUNTPOINT
self.encrypted_device = ENCRYPTED_DEVICE
Expand All @@ -62,14 +69,22 @@ def __init__(self, archive):

self.brlaser_driver = BRLASER_DRIVER
self.brlaser_ppd = BRLASER_PPD

self.archive = archive
self.submission_dirname = os.path.basename(self.archive).split(".")[0]
self.submission_dirname = os.path.basename(self.archive).split(".")[0]
self.target_dirname = "sd-export-{}".format(
datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
)
self.tmpdir = tempfile.mkdtemp()

try:
with open(config_path) as f:
json_config = json.loads(f.read())
self.pci_bus_id = json_config.get("pci_bus_id", None)
if self.pci_bus_id is None:
raise
except Exception:
self.exit_gracefully("ERROR_CONFIG")

def exit_gracefully(self, msg, e=False):
"""
Expand All @@ -92,32 +107,65 @@ def exit_gracefully(self, msg, e=False):
sys.stderr.write("\n")
# exit with 0 return code otherwise the os will attempt to open
# the file with another application
self.popup_message("Export error: {}".format(msg))
sys.exit(0)


def popup_message(self, msg):
try:
subprocess.check_call([
"notify-send",
"--expire-time", "3000",
"--icon", "/usr/share/securedrop/icons/sd-logo.png",
"SecureDrop: {}".format(msg)
])
subprocess.check_call(
[
"notify-send",
"--expire-time",
"3000",
"--icon",
"/usr/share/securedrop/icons/sd-logo.png",
"SecureDrop: {}".format(msg),
]
)
except subprocess.CalledProcessError as e:
msg = "Error sending notification:"
self.exit_gracefully(msg, e=e)


def extract_tarball(self):
try:
with tarfile.open(self.archive) as tar:
tar.extractall(self.tmpdir)
except Exception as e:
print (e)
msg = "Error opening export bundle: "
self.exit_gracefully(msg, e=e)
except Exception:
msg = "ERROR_EXTRACTION"
self.exit_gracefully(msg)

def check_usb_connected(self):

# If the USB is not attached via qvm-usb attach, lsusb will return empty string and a
# return code of 1
try:
p = subprocess.check_output(["lsusb", "-s", "{}:".format(self.pci_bus_id)])
except subprocess.CalledProcessError:
msg = "ERROR_USB_CONFIGURATION"
self.exit_gracefully(msg)
n_usb = len(p.decode("utf-8").rstrip().split("\n"))
# If there is one device, it is the root hub.
if n_usb == 1:
msg = "USB_NOT_CONNECTED"
self.exit_gracefully(msg)
# If there are two devices, it's the root hub and another device (presumably for export)
elif n_usb == 2:
msg = "USB_CONNECTED"
self.exit_gracefully(msg)
# Else the result is unexpected
else:
msg = "ERROR_USB_CHECK"
self.exit_gracefully(msg)

def check_luks_volume(self):
try:
# cryptsetup isLuks returns 0 if the device is a luks volume
# subprocess with throw if the device is not luks (rc !=0)
p = subprocess.check_call(["sudo", "cryptsetup", "isLuks", DEVICE])
msg = "USB_ENCRYPTED"
self.exit_gracefully(msg)
except subprocess.CalledProcessError:
msg = "USB_NO_SUPPORTED_ENCRYPTION"
self.exit_gracefully(msg)

def unlock_luks_volume(self, encryption_key):
# the luks device is not already unlocked
Expand All @@ -131,10 +179,9 @@ def unlock_luks_volume(self, encryption_key):
p.communicate(input=str.encode(encryption_key, "utf-8"))
rc = p.returncode
if rc != 0:
msg = "Bad passphrase or luks error."
msg = "USB_BAD_PASSPHRASE"
self.exit_gracefully(msg)


def mount_volume(self):
# mount target not created
if not os.path.exists(self.mountpoint):
Expand All @@ -148,73 +195,66 @@ def mount_volume(self):
self.mountpoint,
]
)
subprocess.check_call(["sudo", "chown", "-R", "user:user", self.mountpoint])
except subprocess.CalledProcessError:
# clean up
subprocess.check_call(
[
"sudo",
"chown",
"-R", "user:user", self.mountpoint,
]
["sudo", "cryptsetup", "luksClose", self.encrypted_device]
)
except subprocess.CalledProcessError as e:
# clean up
subprocess.check_call(["sudo", "cryptsetup", "luksClose", self.encrypted_device])
msg = "An error occurred while mounting disk: "
self.exit_gracefully(msg, e=e)

msg = "ERROR_USB_MOUNT"
self.exit_gracefully(msg)

def copy_submission(self):
# move files to drive (overwrites files with same filename) and unmount drive
try:
target_path = os.path.join(self.mountpoint, self.target_dirname)
subprocess.check_call(["mkdir", target_path])
export_data = os.path.join(
self.tmpdir, "export_data/"
)
export_data = os.path.join(self.tmpdir, "export_data/")
subprocess.check_call(["cp", "-r", export_data, target_path])
self.popup_message("Files exported successfully to disk.")
except (subprocess.CalledProcessError, OSError) as e:
msg = "Error writing to disk:"
self.exit_gracefully(msg, e=e)
except (subprocess.CalledProcessError, OSError):
msg = "ERROR_USB_WRITE"
self.exit_gracefully(msg)
finally:
# Finally, we sync the filesystem, unmount the drive and lock the
# luks volume, and exit 0
subprocess.check_call(["sync"])
subprocess.check_call(["sudo", "umount", self.mountpoint])
subprocess.check_call(["sudo", "cryptsetup", "luksClose", self.encrypted_device])
subprocess.check_call(
["sudo", "cryptsetup", "luksClose", self.encrypted_device]
)
subprocess.check_call(["rm", "-rf", self.tmpdir])
sys.exit(0)


def wait_for_print(self):
# use lpstat to ensure the job was fully transfered to the printer
# returns True if print was successful, otherwise will throw exceptions
signal.signal(signal.SIGALRM, handler)
signal.alarm(self.printer_wait_timeout)
printer_idle_string = "printer {} is idle".format(self.printer_name)
while(True):
while True:
try:
output = subprocess.check_output(["lpstat", "-p", self.printer_name])
if(printer_idle_string in output.decode("utf-8")):
if printer_idle_string in output.decode("utf-8"):
return True
else:
time.sleep(5)
except subprocess.CalledProcessError as e:
msg = "Error while retrieving print status"
self.exit_gracefully(msg, e=e)
except TimeoutException as e:
msg = "Timeout when getting printer information"
self.exit_gracefully(msg, e=e)
except subprocess.CalledProcessError:
msg = "ERROR_PRINT"
self.exit_gracefully(msg)
except TimeoutException:
msg = "ERROR_PRINT"
self.exit_gracefully(msg)
return True


def get_printer_uri(self):
# Get the URI via lpinfo and only accept URIs of supported printers
printer_uri = ""
try:
output = subprocess.check_output(["sudo", "lpinfo", "-v"])
except subprocess.CalledProcessError as e:
msg = "Error retrieving printer uri."
self.exit_gracefully(msg, e=e)
except subprocess.CalledProcessError:
msg = "ERROR_PRINTER_URI"
self.exit_gracefully(msg)

# fetch the usb printer uri
for line in output.split():
Expand All @@ -224,28 +264,32 @@ def get_printer_uri(self):
# verify that the printer is supported, else exit
if printer_uri == "":
# No usb printer is connected
self.exit_gracefully("USB Printer not found")
self.exit_gracefully("ERROR_PRINTER_NOT_FOUND")
elif "Brother" in printer_uri:
return printer_uri
else:
# printer url is a make that is unsupported
self.exit_gracefully("USB Printer not supported")

self.exit_gracefully("ERROR_PRINTER_NOT_SUPPORTED")

def install_printer_ppd(self, uri):
# Some drivers don't come with ppd files pre-compiled, we must compile them
# Some drivers don't come with ppd files pre-compiled, we must compile them
if "Brother" in uri:
try:
subprocess.check_call(
["sudo", "ppdc", self.brlaser_driver, "-d", "/usr/share/cups/model/"]
[
"sudo",
"ppdc",
self.brlaser_driver,
"-d",
"/usr/share/cups/model/",
]
)
except subprocess.CalledProcessError as e:
msg = "Error installing ppd file for printer {}.".format(uri)
self.exit_gracefully(msg, e=e)
except subprocess.CalledProcessError:
msg = "ERROR_PRINTER_DRIVER_UNAVAILBLE"
self.exit_gracefully(msg)
return self.brlaser_ppd
# Here, we could support ppd drivers for other makes or models in the future


def setup_printer(self, printer_uri, printer_ppd):
try:
# Add the printer using lpadmin
Expand All @@ -267,18 +311,14 @@ def setup_printer(self, printer_uri, printer_ppd):
subprocess.check_call(
["sudo", "lpadmin", "-p", self.printer_name, "-u", "allow:user"]
)
except subprocess.CalledProcessError as e:
msg = "Error setting up printer {} at {} using {}.".format(
self.printer_name, printer_uri, printer_ppd
)
self.exit_gracefully(msg, e=e)

except subprocess.CalledProcessError:
msg = "ERROR_PRINTER_INSTALL"
self.exit_gracefully(msg)

def print_test_page(self):
self.print_file("/usr/share/cups/data/testprint")
self.popup_message("Printing test page")


def print_all_files(self):
files_path = os.path.join(self.tmpdir, "export_data/")
files = os.listdir(files_path)
Expand All @@ -290,16 +330,23 @@ def print_all_files(self):
msg = "Printing document {} of {}".format(print_count, len(files))
self.popup_message(msg)


def is_open_office_file(self, filename):
OPEN_OFFICE_FORMATS = [".doc", ".docx", ".xls", ".xlsx",
".ppt", ".pptx", ".odt", ".ods", ".odp"]
OPEN_OFFICE_FORMATS = [
".doc",
".docx",
".xls",
".xlsx",
".ppt",
".pptx",
".odt",
".ods",
".odp",
]
for extension in OPEN_OFFICE_FORMATS:
if os.path.basename(filename).endswith(extension):
return True
return False


def print_file(self, file_to_print):
try:
# if the file to print is an (open)office document, we need to call unoconf to convert
Expand All @@ -312,11 +359,10 @@ def print_file(self, file_to_print):
file_to_print = converted_path

subprocess.check_call(["xpp", "-P", self.printer_name, file_to_print])
except subprocess.CalledProcessError as e:
msg = "Error printing file {} with printer {}.".format(
file_to_print, self.printer_name
)
self.exit_gracefully(msg, e=e)
except subprocess.CalledProcessError:
msg = "ERROR_PRINT"
self.exit_gracefully(msg)


## class ends here
class TimeoutException(Exception):
Expand Down
Loading