From 9763192a3bb269775fcd1f07e6aadca85d5ea9c6 Mon Sep 17 00:00:00 2001 From: mickael e Date: Wed, 24 Jul 2019 09:15:59 -0400 Subject: [PATCH 1/7] Provides usb preflight checks Also check if a USB device is connected via `usb-test` action: * returns `USB_CONNECTED` if a device is connected to sd-export Qube * returns `USB_NOT_CONNECTED` if a device is not connnected to sd-export Qube Also checks if a Drive is LUKS-encrypted via `disk-test` action: * returns `USB_ENCRYPTED` if the usb device is LUKS-encrypted * returns `USB_NO_SUPPORTED_ENCRYPTION` if the usb device is not LUKS-encrypted --- securedrop_export/export.py | 45 +++++++++++++++++++- securedrop_export/main.py | 6 ++- tests/test_export.py | 82 +++++++++++++++++++++++++++++++++---- 3 files changed, 122 insertions(+), 11 deletions(-) diff --git a/securedrop_export/export.py b/securedrop_export/export.py index f5ce56a..e1343c5 100755 --- a/securedrop_export/export.py +++ b/securedrop_export/export.py @@ -18,7 +18,7 @@ ENCRYPTED_DEVICE = "encrypted_volume" BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv" BRLASER_PPD = "/usr/share/cups/model/br7030.ppd" - +PCI_BUS_ID = "002:" class Metadata(object): """ @@ -26,7 +26,13 @@ class Metadata(object): """ METADATA_FILE = "metadata.json" - SUPPORTED_EXPORT_METHODS = ["disk", "printer", "printer-test"] + SUPPORTED_EXPORT_METHODS = [ + "usb-test", # general preflight check + "disk", + "disk-test", # disk preflight test + "printer", + "printer-test" # print test page + ] SUPPORTED_ENCRYPTION_METHODS = ["luks"] def __init__(self, archive_path): @@ -119,6 +125,41 @@ def extract_tarball(self): self.exit_gracefully(msg, e=e) + def check_usb_connected(self): + # Rely on the output of lsusb on the bus assigned to. We might need to make this variable configurable + # In the future and extracted from config.json + p = subprocess.check_output(["lsusb", "-s", PCI_BUS_ID]) + # Empty string means a likely wrong PCI_BUS_ID + if p == "": + msg = "ERROR_USB_CHECK" + self.exit_gracefully(msg) + n_usb = len(p.rstrip().split("\n")) + # If there is one device, it is the root hub. + if n_usb == 1: + msg = "USB_NOT_CONNECTED" + self.exit_gracefully(msg) + # If there are two devices, it's the root hub and another device (presumably for export) + elif n_usb == 2: + msg = "USB_CONNECTED" + self.exit_gracefully(msg) + # Else the result is unexpected + else: + msg = "ERROR_USB_CHECK" + self.exit_gracefully(msg) + + + def check_luks_volume(self): + try: + # cryptsetup isLuks returns 0 if the device is a luks volume + # subprocess with throw if the device is not luks (rc !=0) + p = subprocess.check_call(["sudo", "cryptsetup", "isLuks", DEVICE]) + msg = "USB_ENCRYPTED" + self.exit_gracefully(msg) + except subprocess.CalledProcessError as e: + msg = "USB_NO_SUPPORTED_ENCRYPTION" + self.exit_gracefully(msg) + + def unlock_luks_volume(self, encryption_key): # the luks device is not already unlocked if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)): diff --git a/securedrop_export/main.py b/securedrop_export/main.py index 79b5d72..58dca3e 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -14,11 +14,15 @@ def __main__(submission): submission.exit_gracefully(msg, e=e) if submission.archive_metadata.is_valid(): - if submission.archive_metadata.export_method == "disk": + if submission.archive_metadata.export_method == "usb-test": + submission.check_usb_connected() + elif submission.archive_metadata.export_method == "disk": # exports all documents in the archive to luks-encrypted volume submission.unlock_luks_volume(submission.archive_metadata.encryption_key) submission.mount_volume() submission.copy_submission() + elif submission.archive_metadata.export_method == "disk-test": + submission.check_luks_volume() elif submission.archive_metadata.export_method == "printer": # prints all documents in the archive printer_uri = submission.get_printer_uri() diff --git a/tests/test_export.py b/tests/test_export.py index 85dbcd2..577667e 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -2,6 +2,7 @@ import os import pytest +import subprocess import tempfile from securedrop_export import export @@ -9,14 +10,10 @@ SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa SAMPLE_OUTPUT_BOTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa - -# This below stanza is only necessary because the export code is not -# structured as a module. If a Python module were created called -# `export`, we could simply do `import export` -# path_to_script = os.path.join( -# os.path.dirname(os.path.abspath(__file__)), "send-to-usb" -# ) -# securedropexport = imp.load_source("send-to-usb", path_to_script) +SAMPLE_OUTPUT_NO_USB="Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa +SAMPLE_OUTPUT_USB="Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa +SAMPLE_OUTPUT_USB_ERROR="" +SAMPLE_OUTPUT_USB_ERROR2="h\ne\nl\nl\no" def test_exit_gracefully_no_exception(capsys): @@ -169,3 +166,72 @@ def test_is_open_office_file(capsys, open_office_paths): def test_is_not_open_office_file(capsys, open_office_paths): submission = export.SDExport("") assert not submission.is_open_office_file(open_office_paths) + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_USB) +def test_usb_precheck_connected(mocked_call, capsys): + submission = export.SDExport("testfile") + expected_message = "USB_NOT_CONNECTED" + mocked_exit = mock.patch("export.exit_gracefully", return_value=0) + with pytest.raises(SystemExit) as sysexit: + result = submission.check_usb_connected() + mocked_exit.assert_called_once_with(expected_message) + + assert sysexit.value.code == 0 + captured = capsys.readouterr() + assert captured.err == "{}\n".format(expected_message) + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB) +def test_usb_precheck_disconnected(mocked_call, capsys): + submission = export.SDExport("testfile") + expected_message = "USB_CONNECTED" + mocked_exit = mock.patch("export.exit_gracefully", return_value=0) + with pytest.raises(SystemExit) as sysexit: + result = submission.check_usb_connected() + mocked_exit.assert_called_once_with(expected_message) + + assert sysexit.value.code == 0 + captured = capsys.readouterr() + assert captured.err == "{}\n".format(expected_message) + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR) +def test_usb_precheck_error(mocked_call, capsys): + submission = export.SDExport("testfile") + expected_message = "ERROR_USB_CHECK" + mocked_exit = mock.patch("export.exit_gracefully", return_value=0) + with pytest.raises(SystemExit) as sysexit: + result = submission.check_usb_connected() + mocked_exit.assert_called_once_with(expected_message) + + assert sysexit.value.code == 0 + captured = capsys.readouterr() + assert captured.err == "{}\n".format(expected_message) + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR2) +def test_usb_precheck_error_2(mocked_call, capsys): + submission = export.SDExport("testfile") + expected_message = "ERROR_USB_CHECK" + mocked_exit = mock.patch("export.exit_gracefully", return_value=0) + with pytest.raises(SystemExit) as sysexit: + result = submission.check_usb_connected() + mocked_exit.assert_called_once_with(expected_message) + + assert sysexit.value.code == 0 + captured = capsys.readouterr() + assert captured.err == "{}\n".format(expected_message) + + +@mock.patch("subprocess.check_call") +def test_luks_precheck_encrypted(mocked_call, capsys): + submission = export.SDExport("testfile") + expected_message = "USB_ENCRYPTED" + with pytest.raises(SystemExit) as sysexit: + result = submission.check_luks_volume() + mocked_exit.assert_called_once_with(expected_message) + assert sysexit.value.code == 0 + captured = capsys.readouterr() + assert captured.err == "{}\n".format(expected_message) + From f511756937092823415ee0710cc6a581e4ecd55f Mon Sep 17 00:00:00 2001 From: mickael e Date: Thu, 25 Jul 2019 12:58:34 -0400 Subject: [PATCH 2/7] Standardize error messages for sd-export All errors return codes are now prefixed with `ERROR_` --- securedrop_export/export.py | 23 +++++++++-------------- securedrop_export/main.py | 4 ++-- tests/test_export.py | 2 +- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/securedrop_export/export.py b/securedrop_export/export.py index e1343c5..46fba8a 100755 --- a/securedrop_export/export.py +++ b/securedrop_export/export.py @@ -98,7 +98,6 @@ def exit_gracefully(self, msg, e=False): sys.stderr.write("\n") # exit with 0 return code otherwise the os will attempt to open # the file with another application - self.popup_message("Export error: {}".format(msg)) sys.exit(0) @@ -172,7 +171,7 @@ def unlock_luks_volume(self, encryption_key): p.communicate(input=str.encode(encryption_key, "utf-8")) rc = p.returncode if rc != 0: - msg = "Bad passphrase or luks error." + msg = "USB_BAD_PASSPHRASE" self.exit_gracefully(msg) @@ -199,7 +198,7 @@ def mount_volume(self): except subprocess.CalledProcessError as e: # clean up subprocess.check_call(["sudo", "cryptsetup", "luksClose", self.encrypted_device]) - msg = "An error occurred while mounting disk: " + msg = "ERROR_USB_MOUNT" self.exit_gracefully(msg, e=e) @@ -214,7 +213,7 @@ def copy_submission(self): subprocess.check_call(["cp", "-r", export_data, target_path]) self.popup_message("Files exported successfully to disk.") except (subprocess.CalledProcessError, OSError) as e: - msg = "Error writing to disk:" + msg = "ERROR_USB_WRITE" self.exit_gracefully(msg, e=e) finally: # Finally, we sync the filesystem, unmount the drive and lock the @@ -254,7 +253,7 @@ def get_printer_uri(self): try: output = subprocess.check_output(["sudo", "lpinfo", "-v"]) except subprocess.CalledProcessError as e: - msg = "Error retrieving printer uri." + msg = "ERROR_PRINTER_URI" self.exit_gracefully(msg, e=e) # fetch the usb printer uri @@ -265,12 +264,12 @@ def get_printer_uri(self): # verify that the printer is supported, else exit if printer_uri == "": # No usb printer is connected - self.exit_gracefully("USB Printer not found") + self.exit_gracefully("ERROR_PRINTER_NOT_FOUND") elif "Brother" in printer_uri: return printer_uri else: # printer url is a make that is unsupported - self.exit_gracefully("USB Printer not supported") + self.exit_gracefully("ERROR_PRINTER_NOT_SUPPORTED") def install_printer_ppd(self, uri): @@ -281,7 +280,7 @@ def install_printer_ppd(self, uri): ["sudo", "ppdc", self.brlaser_driver, "-d", "/usr/share/cups/model/"] ) except subprocess.CalledProcessError as e: - msg = "Error installing ppd file for printer {}.".format(uri) + msg = "ERROR_PRINTER_DRIVER_INSTALL" self.exit_gracefully(msg, e=e) return self.brlaser_ppd # Here, we could support ppd drivers for other makes or models in the future @@ -309,9 +308,7 @@ def setup_printer(self, printer_uri, printer_ppd): ["sudo", "lpadmin", "-p", self.printer_name, "-u", "allow:user"] ) except subprocess.CalledProcessError as e: - msg = "Error setting up printer {} at {} using {}.".format( - self.printer_name, printer_uri, printer_ppd - ) + msg = "ERROR_PRINTER_INSTALL" self.exit_gracefully(msg, e=e) @@ -354,9 +351,7 @@ def print_file(self, file_to_print): subprocess.check_call(["xpp", "-P", self.printer_name, file_to_print]) except subprocess.CalledProcessError as e: - msg = "Error printing file {} with printer {}.".format( - file_to_print, self.printer_name - ) + msg = "ERROR_PRINT" self.exit_gracefully(msg, e=e) ## class ends here diff --git a/securedrop_export/main.py b/securedrop_export/main.py index 58dca3e..a0b830c 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -10,7 +10,7 @@ def __main__(submission): try: submission.archive_metadata = export.Metadata(submission.tmpdir) except Exception as e: - msg = "Error parsing metadata: " + msg = "ERROR_METADATA_PARSING" submission.exit_gracefully(msg, e=e) if submission.archive_metadata.is_valid(): @@ -36,6 +36,6 @@ def __main__(submission): submission.setup_printer(printer_uri, printer_ppd) submission.print_test_page() else: - submission.exit_gracefully("Archive metadata is invalid") + submission.exit_gracefully("ERROR_ARCHIVE_METADATA") diff --git a/tests/test_export.py b/tests/test_export.py index 577667e..00996c9 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -132,7 +132,7 @@ def test_get_good_printer_uri(mocked_call): @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER) def test_get_bad_printer_uri(mocked_call, capsys): submission = export.SDExport("testfile") - expected_message = "USB Printer not found" + expected_message = "ERROR_PRINTER_NOT_FOUND" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) with pytest.raises(SystemExit) as sysexit: From 8cfa3bdc68b2541ca24ef24e31dc11fe97b93f34 Mon Sep 17 00:00:00 2001 From: mickael e Date: Thu, 1 Aug 2019 16:20:54 -0400 Subject: [PATCH 3/7] Add more return codes for errors --- securedrop_export/entrypoint.py | 6 ++--- securedrop_export/export.py | 43 ++++++++++++++++----------------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/securedrop_export/entrypoint.py b/securedrop_export/entrypoint.py index 9958d5d..fbdec17 100755 --- a/securedrop_export/entrypoint.py +++ b/securedrop_export/entrypoint.py @@ -11,7 +11,7 @@ def start(): try: # Halt immediately if target file is absent if not os.path.exists(my_sub.archive): - msg = "File does not exist" + msg = "ERROR_FILE_NOT_FOUND" my_sub.exit_gracefully(msg) main.__main__(my_sub) # Delete extracted achive from tempfile @@ -19,5 +19,5 @@ def start(): except Exception as e: # exit with 0 return code otherwise the os will attempt to open # the file with another application - msg = "Unhandled exception:" - my_sub.exit_gracefully(msg, e=e) + msg = "ERROR_GENERIC" + my_sub.exit_gracefully(msg) diff --git a/securedrop_export/export.py b/securedrop_export/export.py index 46fba8a..12f918f 100755 --- a/securedrop_export/export.py +++ b/securedrop_export/export.py @@ -118,10 +118,9 @@ def extract_tarball(self): try: with tarfile.open(self.archive) as tar: tar.extractall(self.tmpdir) - except Exception as e: - print (e) - msg = "Error opening export bundle: " - self.exit_gracefully(msg, e=e) + except Exception: + msg = "ERROR_EXTRACTION" + self.exit_gracefully(msg) def check_usb_connected(self): @@ -154,7 +153,7 @@ def check_luks_volume(self): p = subprocess.check_call(["sudo", "cryptsetup", "isLuks", DEVICE]) msg = "USB_ENCRYPTED" self.exit_gracefully(msg) - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError: msg = "USB_NO_SUPPORTED_ENCRYPTION" self.exit_gracefully(msg) @@ -195,11 +194,11 @@ def mount_volume(self): "-R", "user:user", self.mountpoint, ] ) - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError: # clean up subprocess.check_call(["sudo", "cryptsetup", "luksClose", self.encrypted_device]) msg = "ERROR_USB_MOUNT" - self.exit_gracefully(msg, e=e) + self.exit_gracefully(msg) def copy_submission(self): @@ -212,9 +211,9 @@ def copy_submission(self): ) subprocess.check_call(["cp", "-r", export_data, target_path]) self.popup_message("Files exported successfully to disk.") - except (subprocess.CalledProcessError, OSError) as e: + except (subprocess.CalledProcessError, OSError): msg = "ERROR_USB_WRITE" - self.exit_gracefully(msg, e=e) + self.exit_gracefully(msg) finally: # Finally, we sync the filesystem, unmount the drive and lock the # luks volume, and exit 0 @@ -238,12 +237,12 @@ def wait_for_print(self): return True else: time.sleep(5) - except subprocess.CalledProcessError as e: - msg = "Error while retrieving print status" - self.exit_gracefully(msg, e=e) - except TimeoutException as e: - msg = "Timeout when getting printer information" - self.exit_gracefully(msg, e=e) + except subprocess.CalledProcessError: + msg = "ERROR_PRINT" + self.exit_gracefully(msg) + except TimeoutException: + msg = "ERROR_PRINT" + self.exit_gracefully(msg) return True @@ -254,7 +253,7 @@ def get_printer_uri(self): output = subprocess.check_output(["sudo", "lpinfo", "-v"]) except subprocess.CalledProcessError as e: msg = "ERROR_PRINTER_URI" - self.exit_gracefully(msg, e=e) + self.exit_gracefully(msg) # fetch the usb printer uri for line in output.split(): @@ -279,9 +278,9 @@ def install_printer_ppd(self, uri): subprocess.check_call( ["sudo", "ppdc", self.brlaser_driver, "-d", "/usr/share/cups/model/"] ) - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError: msg = "ERROR_PRINTER_DRIVER_INSTALL" - self.exit_gracefully(msg, e=e) + self.exit_gracefully(msg) return self.brlaser_ppd # Here, we could support ppd drivers for other makes or models in the future @@ -307,9 +306,9 @@ def setup_printer(self, printer_uri, printer_ppd): subprocess.check_call( ["sudo", "lpadmin", "-p", self.printer_name, "-u", "allow:user"] ) - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError: msg = "ERROR_PRINTER_INSTALL" - self.exit_gracefully(msg, e=e) + self.exit_gracefully(msg) def print_test_page(self): @@ -350,9 +349,9 @@ def print_file(self, file_to_print): file_to_print = converted_path subprocess.check_call(["xpp", "-P", self.printer_name, file_to_print]) - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError: msg = "ERROR_PRINT" - self.exit_gracefully(msg, e=e) + self.exit_gracefully(msg) ## class ends here class TimeoutException(Exception): From bee4790b4d8d5aaf7a69f3b4674057e4e598e7cf Mon Sep 17 00:00:00 2001 From: mickael e Date: Thu, 1 Aug 2019 16:50:52 -0400 Subject: [PATCH 4/7] Address review comments Configurable PCI_DEVICE_ID will be populated at provision time by salt logic --- securedrop_export/entrypoint.py | 3 +- securedrop_export/export.py | 26 ++++++----- securedrop_export/main.py | 13 ++---- tests/sd-export-config-bad-2.json | 3 ++ tests/sd-export-config-bad.json | 3 ++ tests/sd-export-config.json | 3 ++ tests/test_export.py | 72 +++++++++++++++++++++++-------- 7 files changed, 83 insertions(+), 40 deletions(-) create mode 100644 tests/sd-export-config-bad-2.json create mode 100644 tests/sd-export-config-bad.json create mode 100644 tests/sd-export-config.json diff --git a/securedrop_export/entrypoint.py b/securedrop_export/entrypoint.py index fbdec17..c5b2249 100755 --- a/securedrop_export/entrypoint.py +++ b/securedrop_export/entrypoint.py @@ -5,9 +5,10 @@ from securedrop_export import export from securedrop_export import main +CONFIG_PATH = "/etc/sd-export-config.json" def start(): - my_sub = export.SDExport(sys.argv[1]) + my_sub = export.SDExport(sys.argv[1], CONFIG_PATH) try: # Halt immediately if target file is absent if not os.path.exists(my_sub.archive): diff --git a/securedrop_export/export.py b/securedrop_export/export.py index 12f918f..7fe7fa5 100755 --- a/securedrop_export/export.py +++ b/securedrop_export/export.py @@ -18,7 +18,6 @@ ENCRYPTED_DEVICE = "encrypted_volume" BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv" BRLASER_PPD = "/usr/share/cups/model/br7030.ppd" -PCI_BUS_ID = "002:" class Metadata(object): """ @@ -37,12 +36,14 @@ class Metadata(object): def __init__(self, archive_path): self.metadata_path = os.path.join(archive_path, self.METADATA_FILE) + try: with open(self.metadata_path) as f: json_config = json.loads(f.read()) self.export_method = json_config.get("device", None) self.encryption_method = json_config.get("encryption_method", None) self.encryption_key = json_config.get("encryption_key", None) + except Exception as e: raise @@ -58,7 +59,7 @@ def is_valid(self): class SDExport(object): - def __init__(self, archive): + def __init__(self, archive, config_path): self.device = DEVICE self.mountpoint = MOUNTPOINT self.encrypted_device = ENCRYPTED_DEVICE @@ -68,14 +69,21 @@ def __init__(self, archive): self.brlaser_driver = BRLASER_DRIVER self.brlaser_ppd = BRLASER_PPD - + self.archive = archive - self.submission_dirname = os.path.basename(self.archive).split(".")[0] + self.submission_dirname = os.path.basename(self.archive).split(".")[0] self.target_dirname = "sd-export-{}".format( datetime.datetime.now().strftime("%Y%m%d-%H%M%S") ) self.tmpdir = tempfile.mkdtemp() + try: + with open(config_path) as f: + json_config = json.loads(f.read()) + self.pci_bus_id = int(json_config.get("pci_bus_id", 2)) + except Exception as e: + self.exit_gracefully("ERROR_CONFIG") + def exit_gracefully(self, msg, e=False): """ @@ -124,10 +132,8 @@ def extract_tarball(self): def check_usb_connected(self): - # Rely on the output of lsusb on the bus assigned to. We might need to make this variable configurable - # In the future and extracted from config.json - p = subprocess.check_output(["lsusb", "-s", PCI_BUS_ID]) - # Empty string means a likely wrong PCI_BUS_ID + p = subprocess.check_output(["lsusb", "-s", self.pci_bus_id]) + # Empty string means a likely wrong pci_bus_id if p == "": msg = "ERROR_USB_CHECK" self.exit_gracefully(msg) @@ -189,8 +195,8 @@ def mount_volume(self): ) subprocess.check_call( [ - "sudo", - "chown", + "sudo", + "chown", "-R", "user:user", self.mountpoint, ] ) diff --git a/securedrop_export/main.py b/securedrop_export/main.py index a0b830c..b517705 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -1,17 +1,12 @@ -import os -import shutil -import sys - -from securedrop_export import export +from securedrop_export import export def __main__(submission): submission.extract_tarball() - try: + try: submission.archive_metadata = export.Metadata(submission.tmpdir) except Exception as e: - msg = "ERROR_METADATA_PARSING" - submission.exit_gracefully(msg, e=e) + submission.exit_gracefully("ERROR_METADATA_PARSING") if submission.archive_metadata.is_valid(): if submission.archive_metadata.export_method == "usb-test": @@ -37,5 +32,3 @@ def __main__(submission): submission.print_test_page() else: submission.exit_gracefully("ERROR_ARCHIVE_METADATA") - - diff --git a/tests/sd-export-config-bad-2.json b/tests/sd-export-config-bad-2.json new file mode 100644 index 0000000..879fb83 --- /dev/null +++ b/tests/sd-export-config-bad-2.json @@ -0,0 +1,3 @@ +{ + "pci_bus_id": "two" +} diff --git a/tests/sd-export-config-bad.json b/tests/sd-export-config-bad.json new file mode 100644 index 0000000..f7cbf8d --- /dev/null +++ b/tests/sd-export-config-bad.json @@ -0,0 +1,3 @@ +{ + "pciishf. i3u 2 +} diff --git a/tests/sd-export-config.json b/tests/sd-export-config.json new file mode 100644 index 0000000..d1167cf --- /dev/null +++ b/tests/sd-export-config.json @@ -0,0 +1,3 @@ +{ + "pci_bus_id": "2" +} diff --git a/tests/test_export.py b/tests/test_export.py index 00996c9..82cc824 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -5,7 +5,7 @@ import subprocess import tempfile -from securedrop_export import export +from securedrop_export import export SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa SAMPLE_OUTPUT_BOTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa @@ -14,10 +14,45 @@ SAMPLE_OUTPUT_USB="Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa SAMPLE_OUTPUT_USB_ERROR="" SAMPLE_OUTPUT_USB_ERROR2="h\ne\nl\nl\no" +TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") +BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad.json") +ANOTHER_BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad-2.json") + + +def test_bad_sd_export_config_invalid_json(capsys): + + expected_message = "ERROR_CONFIG" + with pytest.raises(SystemExit) as sysexit: + submission = export.SDExport("", BAD_TEST_CONFIG) + # A graceful exit means a return code of 0 + assert sysexit.value.code == 0 + + captured = capsys.readouterr() + assert captured.err == "{}\n".format(expected_message) + assert captured.out == "" + + +def test_bad_sd_export_config_invalid_value(capsys): + + expected_message = "ERROR_CONFIG" + with pytest.raises(SystemExit) as sysexit: + submission = export.SDExport("", ANOTHER_BAD_TEST_CONFIG) + # A graceful exit means a return code of 0 + assert sysexit.value.code == 0 + + captured = capsys.readouterr() + assert captured.err == "{}\n".format(expected_message) + assert captured.out == "" + + +def test_good_sd_export_config(capsys): + submission = export.SDExport("", TEST_CONFIG) + assert submission.pci_bus_id == 2 def test_exit_gracefully_no_exception(capsys): - submission = export.SDExport("testfile") + + submission = export.SDExport("testfile", TEST_CONFIG) test_msg = 'test' with pytest.raises(SystemExit) as sysexit: @@ -32,7 +67,7 @@ def test_exit_gracefully_no_exception(capsys): def test_exit_gracefully_exception(capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) test_msg = 'test' with pytest.raises(SystemExit) as sysexit: @@ -48,7 +83,7 @@ def test_exit_gracefully_exception(capsys): def test_empty_config(capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) with open(metadata, "w") as f: @@ -58,7 +93,7 @@ def test_empty_config(capsys): def test_valid_printer_test_config(capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) with open(metadata, "w") as f: @@ -70,7 +105,7 @@ def test_valid_printer_test_config(capsys): def test_valid_printer_config(capsys): - submission = export.SDExport("") + submission = export.SDExport("", TEST_CONFIG) temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) with open(metadata, "w") as f: @@ -82,7 +117,7 @@ def test_valid_printer_config(capsys): def test_invalid_encryption_config(capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) @@ -97,7 +132,7 @@ def test_invalid_encryption_config(capsys): def test_valid_encryption_config(capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) with open(metadata, "w") as f: @@ -112,7 +147,7 @@ def test_valid_encryption_config(capsys): @mock.patch("subprocess.check_call") def test_popup_message(mocked_call): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) submission.popup_message("hello!") mocked_call.assert_called_once_with([ "notify-send", @@ -124,14 +159,14 @@ def test_popup_message(mocked_call): @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_BOTHER_PRINTER) def test_get_good_printer_uri(mocked_call): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) result = submission.get_printer_uri() assert result == "usb://Brother/HL-L2320D%20series?serial=A00000A000000" @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER) def test_get_bad_printer_uri(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "ERROR_PRINTER_NOT_FOUND" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) @@ -153,7 +188,7 @@ def test_get_bad_printer_uri(mocked_call, capsys): "/tmp/tmpJf83j9/secret.pptx" ]) def test_is_open_office_file(capsys, open_office_paths): - submission = export.SDExport("") + submission = export.SDExport("", TEST_CONFIG) assert submission.is_open_office_file(open_office_paths) @@ -164,13 +199,13 @@ def test_is_open_office_file(capsys, open_office_paths): "/tmp/tmpJf83j9/secret.gpg" ]) def test_is_not_open_office_file(capsys, open_office_paths): - submission = export.SDExport("") + submission = export.SDExport("", TEST_CONFIG) assert not submission.is_open_office_file(open_office_paths) @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_USB) def test_usb_precheck_connected(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "USB_NOT_CONNECTED" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) with pytest.raises(SystemExit) as sysexit: @@ -184,7 +219,7 @@ def test_usb_precheck_connected(mocked_call, capsys): @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB) def test_usb_precheck_disconnected(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "USB_CONNECTED" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) with pytest.raises(SystemExit) as sysexit: @@ -198,7 +233,7 @@ def test_usb_precheck_disconnected(mocked_call, capsys): @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR) def test_usb_precheck_error(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "ERROR_USB_CHECK" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) with pytest.raises(SystemExit) as sysexit: @@ -212,7 +247,7 @@ def test_usb_precheck_error(mocked_call, capsys): @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR2) def test_usb_precheck_error_2(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "ERROR_USB_CHECK" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) with pytest.raises(SystemExit) as sysexit: @@ -226,7 +261,7 @@ def test_usb_precheck_error_2(mocked_call, capsys): @mock.patch("subprocess.check_call") def test_luks_precheck_encrypted(mocked_call, capsys): - submission = export.SDExport("testfile") + submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "USB_ENCRYPTED" with pytest.raises(SystemExit) as sysexit: result = submission.check_luks_volume() @@ -234,4 +269,3 @@ def test_luks_precheck_encrypted(mocked_call, capsys): assert sysexit.value.code == 0 captured = capsys.readouterr() assert captured.err == "{}\n".format(expected_message) - From 0a98222a2713e08461ed2def5029bbfa8e42313a Mon Sep 17 00:00:00 2001 From: mickael e Date: Fri, 2 Aug 2019 11:00:10 -0400 Subject: [PATCH 5/7] Code formatting --- securedrop_export/entrypoint.py | 3 +- securedrop_export/export.py | 98 ++++++++++++++++----------------- securedrop_export/main.py | 5 +- tests/test_export.py | 11 ++-- 4 files changed, 60 insertions(+), 57 deletions(-) diff --git a/securedrop_export/entrypoint.py b/securedrop_export/entrypoint.py index c5b2249..a868a15 100755 --- a/securedrop_export/entrypoint.py +++ b/securedrop_export/entrypoint.py @@ -7,6 +7,7 @@ CONFIG_PATH = "/etc/sd-export-config.json" + def start(): my_sub = export.SDExport(sys.argv[1], CONFIG_PATH) try: @@ -17,7 +18,7 @@ def start(): main.__main__(my_sub) # Delete extracted achive from tempfile shutil.rmtree(my_sub.tmpdir) - except Exception as e: + except Exception: # exit with 0 return code otherwise the os will attempt to open # the file with another application msg = "ERROR_GENERIC" diff --git a/securedrop_export/export.py b/securedrop_export/export.py index 7fe7fa5..7e20728 100755 --- a/securedrop_export/export.py +++ b/securedrop_export/export.py @@ -19,6 +19,7 @@ BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv" BRLASER_PPD = "/usr/share/cups/model/br7030.ppd" + class Metadata(object): """ Object to parse, validate and store json metadata from the sd-export archive. @@ -26,11 +27,11 @@ class Metadata(object): METADATA_FILE = "metadata.json" SUPPORTED_EXPORT_METHODS = [ - "usb-test", # general preflight check - "disk", - "disk-test", # disk preflight test - "printer", - "printer-test" # print test page + "usb-test", # general preflight check + "disk", + "disk-test", # disk preflight test + "printer", + "printer-test", # print test page ] SUPPORTED_ENCRYPTION_METHODS = ["luks"] @@ -44,7 +45,7 @@ def __init__(self, archive_path): self.encryption_method = json_config.get("encryption_method", None) self.encryption_key = json_config.get("encryption_key", None) - except Exception as e: + except Exception: raise def is_valid(self): @@ -58,7 +59,6 @@ def is_valid(self): class SDExport(object): - def __init__(self, archive, config_path): self.device = DEVICE self.mountpoint = MOUNTPOINT @@ -81,10 +81,9 @@ def __init__(self, archive, config_path): with open(config_path) as f: json_config = json.loads(f.read()) self.pci_bus_id = int(json_config.get("pci_bus_id", 2)) - except Exception as e: + except Exception: self.exit_gracefully("ERROR_CONFIG") - def exit_gracefully(self, msg, e=False): """ Utility to print error messages, mostly used during debugging, @@ -108,20 +107,22 @@ def exit_gracefully(self, msg, e=False): # the file with another application sys.exit(0) - def popup_message(self, msg): try: - subprocess.check_call([ - "notify-send", - "--expire-time", "3000", - "--icon", "/usr/share/securedrop/icons/sd-logo.png", - "SecureDrop: {}".format(msg) - ]) + subprocess.check_call( + [ + "notify-send", + "--expire-time", + "3000", + "--icon", + "/usr/share/securedrop/icons/sd-logo.png", + "SecureDrop: {}".format(msg), + ] + ) except subprocess.CalledProcessError as e: msg = "Error sending notification:" self.exit_gracefully(msg, e=e) - def extract_tarball(self): try: with tarfile.open(self.archive) as tar: @@ -130,7 +131,6 @@ def extract_tarball(self): msg = "ERROR_EXTRACTION" self.exit_gracefully(msg) - def check_usb_connected(self): p = subprocess.check_output(["lsusb", "-s", self.pci_bus_id]) # Empty string means a likely wrong pci_bus_id @@ -151,7 +151,6 @@ def check_usb_connected(self): msg = "ERROR_USB_CHECK" self.exit_gracefully(msg) - def check_luks_volume(self): try: # cryptsetup isLuks returns 0 if the device is a luks volume @@ -163,7 +162,6 @@ def check_luks_volume(self): msg = "USB_NO_SUPPORTED_ENCRYPTION" self.exit_gracefully(msg) - def unlock_luks_volume(self, encryption_key): # the luks device is not already unlocked if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)): @@ -179,7 +177,6 @@ def unlock_luks_volume(self, encryption_key): msg = "USB_BAD_PASSPHRASE" self.exit_gracefully(msg) - def mount_volume(self): # mount target not created if not os.path.exists(self.mountpoint): @@ -193,28 +190,21 @@ def mount_volume(self): self.mountpoint, ] ) - subprocess.check_call( - [ - "sudo", - "chown", - "-R", "user:user", self.mountpoint, - ] - ) + subprocess.check_call(["sudo", "chown", "-R", "user:user", self.mountpoint]) except subprocess.CalledProcessError: # clean up - subprocess.check_call(["sudo", "cryptsetup", "luksClose", self.encrypted_device]) + subprocess.check_call( + ["sudo", "cryptsetup", "luksClose", self.encrypted_device] + ) msg = "ERROR_USB_MOUNT" self.exit_gracefully(msg) - def copy_submission(self): # move files to drive (overwrites files with same filename) and unmount drive try: target_path = os.path.join(self.mountpoint, self.target_dirname) subprocess.check_call(["mkdir", target_path]) - export_data = os.path.join( - self.tmpdir, "export_data/" - ) + export_data = os.path.join(self.tmpdir, "export_data/") subprocess.check_call(["cp", "-r", export_data, target_path]) self.popup_message("Files exported successfully to disk.") except (subprocess.CalledProcessError, OSError): @@ -225,21 +215,22 @@ def copy_submission(self): # luks volume, and exit 0 subprocess.check_call(["sync"]) subprocess.check_call(["sudo", "umount", self.mountpoint]) - subprocess.check_call(["sudo", "cryptsetup", "luksClose", self.encrypted_device]) + subprocess.check_call( + ["sudo", "cryptsetup", "luksClose", self.encrypted_device] + ) subprocess.check_call(["rm", "-rf", self.tmpdir]) sys.exit(0) - def wait_for_print(self): # use lpstat to ensure the job was fully transfered to the printer # returns True if print was successful, otherwise will throw exceptions signal.signal(signal.SIGALRM, handler) signal.alarm(self.printer_wait_timeout) printer_idle_string = "printer {} is idle".format(self.printer_name) - while(True): + while True: try: output = subprocess.check_output(["lpstat", "-p", self.printer_name]) - if(printer_idle_string in output.decode("utf-8")): + if printer_idle_string in output.decode("utf-8"): return True else: time.sleep(5) @@ -251,13 +242,12 @@ def wait_for_print(self): self.exit_gracefully(msg) return True - def get_printer_uri(self): # Get the URI via lpinfo and only accept URIs of supported printers printer_uri = "" try: output = subprocess.check_output(["sudo", "lpinfo", "-v"]) - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError: msg = "ERROR_PRINTER_URI" self.exit_gracefully(msg) @@ -276,13 +266,18 @@ def get_printer_uri(self): # printer url is a make that is unsupported self.exit_gracefully("ERROR_PRINTER_NOT_SUPPORTED") - def install_printer_ppd(self, uri): - # Some drivers don't come with ppd files pre-compiled, we must compile them + # Some drivers don't come with ppd files pre-compiled, we must compile them if "Brother" in uri: try: subprocess.check_call( - ["sudo", "ppdc", self.brlaser_driver, "-d", "/usr/share/cups/model/"] + [ + "sudo", + "ppdc", + self.brlaser_driver, + "-d", + "/usr/share/cups/model/", + ] ) except subprocess.CalledProcessError: msg = "ERROR_PRINTER_DRIVER_INSTALL" @@ -290,7 +285,6 @@ def install_printer_ppd(self, uri): return self.brlaser_ppd # Here, we could support ppd drivers for other makes or models in the future - def setup_printer(self, printer_uri, printer_ppd): try: # Add the printer using lpadmin @@ -316,12 +310,10 @@ def setup_printer(self, printer_uri, printer_ppd): msg = "ERROR_PRINTER_INSTALL" self.exit_gracefully(msg) - def print_test_page(self): self.print_file("/usr/share/cups/data/testprint") self.popup_message("Printing test page") - def print_all_files(self): files_path = os.path.join(self.tmpdir, "export_data/") files = os.listdir(files_path) @@ -333,16 +325,23 @@ def print_all_files(self): msg = "Printing document {} of {}".format(print_count, len(files)) self.popup_message(msg) - def is_open_office_file(self, filename): - OPEN_OFFICE_FORMATS = [".doc", ".docx", ".xls", ".xlsx", - ".ppt", ".pptx", ".odt", ".ods", ".odp"] + OPEN_OFFICE_FORMATS = [ + ".doc", + ".docx", + ".xls", + ".xlsx", + ".ppt", + ".pptx", + ".odt", + ".ods", + ".odp", + ] for extension in OPEN_OFFICE_FORMATS: if os.path.basename(filename).endswith(extension): return True return False - def print_file(self, file_to_print): try: # if the file to print is an (open)office document, we need to call unoconf to convert @@ -359,6 +358,7 @@ def print_file(self, file_to_print): msg = "ERROR_PRINT" self.exit_gracefully(msg) + ## class ends here class TimeoutException(Exception): pass diff --git a/securedrop_export/main.py b/securedrop_export/main.py index b517705..e9ae86d 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -1,12 +1,13 @@ from securedrop_export import export + def __main__(submission): submission.extract_tarball() try: submission.archive_metadata = export.Metadata(submission.tmpdir) - except Exception as e: - submission.exit_gracefully("ERROR_METADATA_PARSING") + except Exception: + submission.exit_gracefully("ERROR_METADATA_PARSING") if submission.archive_metadata.is_valid(): if submission.archive_metadata.export_method == "usb-test": diff --git a/tests/test_export.py b/tests/test_export.py index 82cc824..19be561 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -2,7 +2,6 @@ import os import pytest -import subprocess import tempfile from securedrop_export import export @@ -10,10 +9,10 @@ SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa SAMPLE_OUTPUT_BOTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa -SAMPLE_OUTPUT_NO_USB="Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa -SAMPLE_OUTPUT_USB="Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa -SAMPLE_OUTPUT_USB_ERROR="" -SAMPLE_OUTPUT_USB_ERROR2="h\ne\nl\nl\no" +SAMPLE_OUTPUT_NO_USB = "Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa +SAMPLE_OUTPUT_USB = "Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa +SAMPLE_OUTPUT_USB_ERROR = "" +SAMPLE_OUTPUT_USB_ERROR2 = "h\ne\nl\nl\no" TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad.json") ANOTHER_BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad-2.json") @@ -263,6 +262,8 @@ def test_usb_precheck_error_2(mocked_call, capsys): def test_luks_precheck_encrypted(mocked_call, capsys): submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "USB_ENCRYPTED" + mocked_exit = mock.patch("export.exit_gracefully", return_value=0) + with pytest.raises(SystemExit) as sysexit: result = submission.check_luks_volume() mocked_exit.assert_called_once_with(expected_message) From 36be369293208aecef2892524a383e7ed143c374 Mon Sep 17 00:00:00 2001 From: mickael e Date: Fri, 2 Aug 2019 14:43:25 -0400 Subject: [PATCH 6/7] Fix parsing and test strings now binary to reflect type of check_output --- securedrop_export/export.py | 17 +++++++++++------ tests/sd-export-config-bad-2.json | 2 +- tests/test_export.py | 13 +++++++------ 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/securedrop_export/export.py b/securedrop_export/export.py index 7e20728..8dcff76 100755 --- a/securedrop_export/export.py +++ b/securedrop_export/export.py @@ -80,7 +80,9 @@ def __init__(self, archive, config_path): try: with open(config_path) as f: json_config = json.loads(f.read()) - self.pci_bus_id = int(json_config.get("pci_bus_id", 2)) + self.pci_bus_id = json_config.get("pci_bus_id", None) + if self.pci_bus_id is None: + raise except Exception: self.exit_gracefully("ERROR_CONFIG") @@ -132,12 +134,15 @@ def extract_tarball(self): self.exit_gracefully(msg) def check_usb_connected(self): - p = subprocess.check_output(["lsusb", "-s", self.pci_bus_id]) - # Empty string means a likely wrong pci_bus_id - if p == "": - msg = "ERROR_USB_CHECK" + + # If the USB is not attached via qvm-usb attach, lsusb will return empty string and a + # return code of 1 + try: + p = subprocess.check_output(["lsusb", "-s", "{}:".format(self.pci_bus_id)]) + except subprocess.CalledProcessError: + msg = "ERROR_USB_CONFIGURATION" self.exit_gracefully(msg) - n_usb = len(p.rstrip().split("\n")) + n_usb = len(p.decode("utf-8").rstrip().split("\n")) # If there is one device, it is the root hub. if n_usb == 1: msg = "USB_NOT_CONNECTED" diff --git a/tests/sd-export-config-bad-2.json b/tests/sd-export-config-bad-2.json index 879fb83..f69e25b 100644 --- a/tests/sd-export-config-bad-2.json +++ b/tests/sd-export-config-bad-2.json @@ -1,3 +1,3 @@ { - "pci_bus_id": "two" + "no_pci_bus_id": "nope" } diff --git a/tests/test_export.py b/tests/test_export.py index 19be561..d0a2946 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -2,6 +2,7 @@ import os import pytest +import subprocess import tempfile from securedrop_export import export @@ -9,10 +10,10 @@ SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa SAMPLE_OUTPUT_BOTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa -SAMPLE_OUTPUT_NO_USB = "Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa -SAMPLE_OUTPUT_USB = "Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa -SAMPLE_OUTPUT_USB_ERROR = "" -SAMPLE_OUTPUT_USB_ERROR2 = "h\ne\nl\nl\no" +SAMPLE_OUTPUT_NO_USB = b"Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa +SAMPLE_OUTPUT_USB = b"Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa +SAMPLE_OUTPUT_USB_ERROR = b"" +SAMPLE_OUTPUT_USB_ERROR2 = b"h\ne\nl\nl\no" TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad.json") ANOTHER_BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad-2.json") @@ -46,7 +47,7 @@ def test_bad_sd_export_config_invalid_value(capsys): def test_good_sd_export_config(capsys): submission = export.SDExport("", TEST_CONFIG) - assert submission.pci_bus_id == 2 + assert submission.pci_bus_id == "2" def test_exit_gracefully_no_exception(capsys): @@ -230,7 +231,7 @@ def test_usb_precheck_disconnected(mocked_call, capsys): assert captured.err == "{}\n".format(expected_message) -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR) +@mock.patch("subprocess.check_output", return_code=1) def test_usb_precheck_error(mocked_call, capsys): submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "ERROR_USB_CHECK" From 2a70172e67a008efeeecf88c4fe8f8edbdb6e320 Mon Sep 17 00:00:00 2001 From: mickael e Date: Tue, 6 Aug 2019 17:03:10 -0400 Subject: [PATCH 7/7] Update return code when printer drivers are not available --- securedrop_export/export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/securedrop_export/export.py b/securedrop_export/export.py index 8dcff76..6c3ac34 100755 --- a/securedrop_export/export.py +++ b/securedrop_export/export.py @@ -285,7 +285,7 @@ def install_printer_ppd(self, uri): ] ) except subprocess.CalledProcessError: - msg = "ERROR_PRINTER_DRIVER_INSTALL" + msg = "ERROR_PRINTER_DRIVER_UNAVAILBLE" self.exit_gracefully(msg) return self.brlaser_ppd # Here, we could support ppd drivers for other makes or models in the future