From 0307265697d9e21460553ec098c388f99a294f53 Mon Sep 17 00:00:00 2001 From: Noah Vesely Date: Sun, 13 Nov 2016 00:50:55 -0800 Subject: [PATCH] Rebase + refactor tests for new download functionality This rebases the new download unread and download all functionality in the index view of the journalist app on the current develop, and in the process does some minor refactoring of the test suite. * Organizes imports: first come external package imports, then the SECUREDROP_ENV environment variable is set, and finally the SD module imports. They are now alphabetized. Also removes some unused imports, and additional addition or removal was done as necessary to support the refactor. * Prefers use of APIs for object construction versus explicit assignment. For example, a call to :function:`crypto_util.genrandomid()` is preferred to the assignment `sid = 'somestring'`. * Class-based rewrite of common.py: in order to better organize the functionality, and encourage better OOP and DRY practices, all the loose functions in `securedrop/tests/common.py` have become (mostly) static or class methods various organizational classes. This allowed us to write a lot of code. * More accurate test method names: a lot of test method names were vaguely descriptive. A common motif was to just end test names in 'success.' It's better to name a method `test_admin_login_redirects_to_index` if it just tests that after successful login you're redirected to the index page, than `test_admin_login_success`. There are many things that should happen on success--:obj:`flask.g` should be set correctly, calls to the database should be made, session should be set in a client-side cookie, and that cookie should be set--the list goes on. Let's be specific about which thing that's supposed to happen happened successfully. (Side note: we do really need to do more comprehensive tests in a lot of places where we only tests that the app redirects correctly, instead of checking that all state and context is modified--or not--in the expected way). * Better async support: The new :class:`tests.common.Async` provides a little bit better asynchronous functionality for a variety of situations, and deprecates the use of sleep statements. * General cleanup: PEP8 stuff, and nicer formatting--some other minor touches. --- securedrop/tests/common.py | 333 +++++-- .../tests/functional/functional_test.py | 29 +- securedrop/tests/test_journalist.py | 69 +- securedrop/tests/test_unit_crypto_util.py | 9 +- securedrop/tests/test_unit_integration.py | 357 ++++---- securedrop/tests/test_unit_journalist.py | 836 ++++++++---------- securedrop/tests/test_unit_source.py | 178 ++-- securedrop/tests/test_unit_store.py | 49 +- 8 files changed, 977 insertions(+), 883 deletions(-) diff --git a/securedrop/tests/common.py b/securedrop/tests/common.py index 5d78211da8..74d44cf845 100644 --- a/securedrop/tests/common.py +++ b/securedrop/tests/common.py @@ -1,120 +1,291 @@ +# -*- coding: utf-8 -*- + +from functools import wraps +import gnupg +import mock import os import shutil -import uuid import subprocess - -import gnupg +import time # Set environment variable so config.py uses a test environment os.environ['SECUREDROP_ENV'] = 'test' + import config -from db import init_db, db_session, Journalist, Reply, Source, Submission +from db import db_session, init_db, Journalist, Reply, Source, Submission import crypto_util +import store -# TODO: the PID file for the redis worker is hard-coded below. -# Ideally this constant would be provided by a test harness. -# It has been intentionally omitted from `config.py.example` -# in order to isolate the test vars from prod vars. -# When refactoring the test suite, the TEST_WORKER_PIDFILE -# TEST_WORKER_PIDFILE is also hard-coded in `manage.py`. -TEST_WORKER_PIDFILE = "/tmp/securedrop_test_worker.pid" +class TestJournalist: + """Test fixtures for working with :class:`db.Journalist`. + """ + @staticmethod + def init_journalist(is_admin=False): + """Initialize a journalist into the database. Return their + :class:`db.Journalist` object and password string. -def clean_root(): - shutil.rmtree(config.SECUREDROP_DATA_ROOT) + :param bool is_admin: Whether the user is an admin. + :returns: A 2-tuple. The first entry, the :class:`db.Journalist` + initialized. The second, their password string. + """ + username = crypto_util.genrandomid() + user_pw = crypto_util.genrandomid() + user = Journalist(username, user_pw, is_admin) + db_session.add(user) + db_session.commit() + return user, user_pw -def create_directories(): - # Create directories for the file store and the GPG keyring - for d in (config.SECUREDROP_DATA_ROOT, config.STORE_DIR, - config.GPG_KEY_DIR, config.TEMP_DIR): - if not os.path.isdir(d): - os.mkdir(d) + @staticmethod + def init_admin(): + return TestJournalist.init_journalist(True) + @staticmethod + def mock_verify_token(self): + patcher = mock.patch('db.Journalist.verify_token') + self.addCleanup(patcher.stop) + self.mock_journalist_verify_token = patcher.start() + self.mock_journalist_verify_token.return_value = True -def init_gpg(): - # Initialize the GPG keyring - gpg = gnupg.GPG(homedir=config.GPG_KEY_DIR) - # Import the journalist key for testing (faster to import a pre-generated - # key than to gen a new one every time) - for keyfile in ("test_journalist_key.pub", "test_journalist_key.sec"): - gpg.import_keys(open(keyfile).read()) - return gpg +class TestSource: + """Test fixtures for working with :class:`db.Source`. + """ + @staticmethod + def init_source(): + """Initialize a source: create their database record, the + filesystem directory that stores their submissions & replies, + and their GPG key encrypted with their codename. -def create_file(filename): - dirname = os.path.dirname(filename) - if not os.path.exists(dirname): - os.makedirs(dirname) - with open(filename, 'w') as fp: - fp.write(str(uuid.uuid4())) + :returns: A 2-tuple. The first entry, the :class:`db.Source` + initialized. The second, their codename string. + """ + # Create source identity and database record + codename = crypto_util.genrandomid() + filesystem_id = crypto_util.hash_codename(codename) + journalist_filename = crypto_util.display_id() + source = Source(filesystem_id, journalist_filename) + db_session.add(source) + db_session.commit() + # Create the directory to store their submissions and replies + os.mkdir(store.path(source.filesystem_id)) + # Generate their key, blocking for as long as necessary + crypto_util.genkeypair(source.filesystem_id, codename) + return source, codename -def setup_test_docs(sid, files): - filenames = [os.path.join(config.STORE_DIR, sid, file) for file in files] + # NOTE: this method is potentially dangerous to rely on for now due + # to the fact flask_testing.TestCase only uses on request context + # per method (see + # https://github.com/freedomofpress/securedrop/issues/1444). + @staticmethod + def new_codename(client, session): + """Helper function to go through the "generate codename" flow. + """ + with client as c: + c.get('/generate') + codename = session['codename'] + c.post('/create') + return codename - for filename in filenames: - create_file(filename) - # Add Submission to the db - source = Source.query.filter(Source.filesystem_id == sid).one() - submission = Submission(source, os.path.basename(filename)) - db_session.add(submission) +class TestSubmission: + """Test fixtures for working with :class:`db.Submission`. + """ + @staticmethod + def submit(source, num_submissions): + """Generates and submits *num_submissions* + :class:`db.Submission`s on behalf of a :class:`db.Source` + *source*. + + :param db.Source source: The source on who's behalf to make + submissions. + + :param int num_submissions: Number of random-data submissions + to make. + + :returns: A list of the :class:`db.Submission`s submitted. + """ + assert num_submissions >= 1 + submissions = [] + for _ in range(num_submissions): + source.interaction_count += 1 + fpath = store.save_message_submission(source.filesystem_id, + source.interaction_count, + source.journalist_filename, + str(os.urandom(1))) + submission = Submission(source, fpath) + submissions.append(submission) + db_session.add(submission) + + db_session.commit() + return submissions + + @staticmethod + def mark_downloaded(*submissions): + for submission in submissions: + submission.downloaded = True db_session.commit() - return filenames +class TestReply: + """Test fixtures for working with :class:`db.Reply`. + """ + @staticmethod + def reply(journalist, source, num_replies): + """Generates and submits *num_replies* replies to *source* + :class:`db.Source`. -def setup_test_replies(sid, journo_id, files): - filenames = [os.path.join(config.STORE_DIR, sid, file) for file in files] + :param db.Journalist journalist: The journalist to write the + reply from. - for filename in filenames: - create_file(filename) + :param db.Source source: The source to send the reply to. + + :param int num_replies: Number of random-data replies to make. + + :returns: A list of the :class:`db.Reply`s submitted. + """ + assert num_replies >= 1 + replies = [] + for _ in range(num_replies): + source.interaction_count += 1 + fname = "{}-{}-reply.gpg".format(source.interaction_count, + source.journalist_filename) + crypto_util.encrypt(str(os.urandom(1)), + [ + crypto_util.getkey(source.filesystem_id), + config.JOURNALIST_KEY + ], + store.path(source.filesystem_id, fname)) + reply = Reply(journalist, source, fname) + replies.append(reply) + db_session.add(reply) - # Add Reply to the db - source = Source.query.filter(Source.filesystem_id == sid).one() - journalist = Journalist.query.filter(Journalist.id == journo_id).one() - reply = Reply(journalist, source, os.path.basename(filename)) - db_session.add(reply) db_session.commit() + return replies + + +class SetUp: + """Test fixtures for initializing a generic test environment. + """ + # TODO: the PID file for the redis worker is hard-coded below. + # Ideally this constant would be provided by a test harness. + # It has been intentionally omitted from `config.py.example` + # in order to isolate the test vars from prod vars. + # When refactoring the test suite, the test_worker_pidfile + # test_worker_pidfile is also hard-coded in `manage.py`. + test_worker_pidfile = "/tmp/securedrop_test_worker.pid" + + @staticmethod + def create_directories(): + # Create directories for the file store and the GPG keyring + for d in (config.SECUREDROP_DATA_ROOT, config.STORE_DIR, + config.GPG_KEY_DIR, config.TEMP_DIR): + if not os.path.isdir(d): + os.mkdir(d) + + @staticmethod + def init_gpg(): + # Initialize the GPG keyring + gpg = gnupg.GPG(homedir=config.GPG_KEY_DIR) + # Import the journalist key for testing (faster to import a pre-generated + # key than to gen a new one every time) + for keyfile in ("test_journalist_key.pub", "test_journalist_key.sec"): + gpg.import_keys(open(keyfile).read()) + return gpg + + @classmethod + def setup(cls): + """Set up the file system, GPG, and database.""" + SetUp.create_directories() + SetUp.init_gpg() + init_db() + # Do tests that should always run on app startup + crypto_util.do_runtime_tests() + # Start the Python-RQ worker if it's not already running + if not os.path.exists(cls.test_worker_pidfile): + subprocess.Popen(["rqworker", + "-P", config.SECUREDROP_ROOT, + "--pid", cls.test_worker_pidfile]) + + +class TearDown: + """Test fixtures for tearing down a generic test environment. + """ + @staticmethod + def teardown(): + shutil.rmtree(config.SECUREDROP_DATA_ROOT) + + # TODO: now that SD has a logout button, we can deprecate use of + # this function. + @staticmethod + def logout(test_client): + with test_client.session_transaction() as sess: + sess.clear() + + +class Async: + """Test fixtures for use with asynchronous processes. + """ + redis_success_return_value = 'success' - return filenames + @classmethod + def wait_for_redis_worker(cls, job, timeout=5): + """Raise an error if the Redis job doesn't complete successfully + before a timeout. + :param rq.job.Job job: A Redis job to wait for. -def new_codename(client, session): - """Helper function to go through the "generate codename" flow""" - with client as c: - rv = c.get('/generate') - codename = session['codename'] - rv = c.post('/create') - return codename + :param int timeout: Seconds to wait for the job to finish. + :raises: An :exc:`AssertionError`. + """ + start_time = time.time() + while time.time() - start_time < timeout: + if job.result == cls.redis_success_return_value: + return + elif job.result not in (None, cls.redis_success_return_value): + assert False, 'Redis worker failed!' + assert False, 'Redis worker timed out!' -def shared_setup(): - """Set up the file system, GPG, and database""" - create_directories() - init_gpg() - init_db() + @staticmethod + def wait_for_function_result(f, timeout=5): + """Wraps function *f*: calls *f* until *f* returns a non-None + value, or timeout is reached. - # Do tests that should always run on app startup - crypto_util.do_runtime_tests() + :param function f: The function to run. - # Start the Python-RQ worker if it's not already running - if not os.path.exists(TEST_WORKER_PIDFILE): - subprocess.Popen(["rqworker", "-P", config.SECUREDROP_ROOT, - "--pid", TEST_WORKER_PIDFILE]) + :param int timeout: Seconds to wait for the function to return. + :raises: An :exc:`AssertionError`. + """ + @wraps(f) + def wrapper(*args, **kwargs): + start_time = time.time() + while time.time() - start_time < timeout: + result = f(*args, **kwargs) + if result is not None: + return result + assert False, 'Asynchronous function timeout!' + return wrapper -def shared_teardown(): - clean_root() + @staticmethod + def wait_for_assertion(assertion_expression, timeout=5): + """Calls an assertion_expression repeatedly, until the assertion + passes or a timeout is reached. + :param assertion_expression: An assertion expression. Generally + a call to a + :class:`unittest.TestCase` method. -def logout(test_client): - # See http://flask.pocoo.org/docs/testing/#accessing-and-modifying-sessions - # This is necessary because SecureDrop doesn't have a logout button, so a - # user is logged in until they close the browser, which clears the session. - # For testing, this function simulates closing the browser at places - # where a source is likely to do so (for instance, between submitting a - # document and checking for a journalist reply). - with test_client.session_transaction() as sess: - sess.clear() + :param int timeout: Seconds to wait for the function to return. + """ + start_time = time.time() + while time.time() - start_time < timeout: + try: + return assertion_expression() + except AssertionError: + pass + # one more try, which will raise any errors if they are outstanding + return assertion_expression() diff --git a/securedrop/tests/functional/functional_test.py b/securedrop/tests/functional/functional_test.py index e6b82b8f7c..014b1d1679 100644 --- a/securedrop/tests/functional/functional_test.py +++ b/securedrop/tests/functional/functional_test.py @@ -1,23 +1,26 @@ -import unittest +# -*- coding: utf-8 -*- + +import gnupg +import os +from multiprocessing import Process from selenium import webdriver -from selenium.webdriver.firefox import firefox_binary from selenium.common.exceptions import WebDriverException -from multiprocessing import Process -import socket +from selenium.webdriver.firefox import firefox_binary import shutil -import os -import gnupg -import urllib2 +import socket import sys +import unittest +import urllib2 # Set environment variable so config.py uses a test environment os.environ['SECUREDROP_ENV'] = 'test' -import config +import db +import config import source import journalist -from tests import common +from tests.common import SetUp, TearDown import urllib2 import signal @@ -54,9 +57,9 @@ def setUp(self): signal.signal(signal.SIGUSR1, lambda _, s: traceback.print_stack(s)) - common.create_directories() - self.gpg = common.init_gpg() - common.init_db() + SetUp.create_directories() + self.gpg = SetUp.init_gpg() + db.init_db() source_port = self._unused_port() journalist_port = self._unused_port() @@ -94,7 +97,7 @@ def start_journalist_server(): self.secret_message = 'blah blah blah' def tearDown(self): - common.clean_root() + TearDown.teardown() self.driver.quit() self.source_process.terminate() self.journalist_process.terminate() diff --git a/securedrop/tests/test_journalist.py b/securedrop/tests/test_journalist.py index db988886bc..948bb75859 100644 --- a/securedrop/tests/test_journalist.py +++ b/securedrop/tests/test_journalist.py @@ -1,12 +1,18 @@ -import unittest +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import os from mock import patch, ANY, MagicMock +import unittest +# Set environment variable so config.py uses a test environment +os.environ['SECUREDROP_ENV'] = 'test' + +from common import SetUp, TearDown, TestJournalist +from db import db_session, InvalidPasswordLength, Journalist import journalist -import common -from db import Journalist, InvalidPasswordLength, db_session -class TestJournalist(unittest.TestCase): +class TestJournalistApp(unittest.TestCase): def setUp(self): journalist.logged_in = MagicMock() @@ -18,10 +24,15 @@ def setUp(self): journalist.get_docs = MagicMock() journalist.get_or_else = MagicMock() + def _set_up_request(self, cols_selected, action): + journalist.request.form.__contains__.return_value = True + journalist.request.form.getlist = MagicMock(return_value=cols_selected) + journalist.request.form.__getitem__.return_value = action + @patch("journalist.col_delete") def test_col_process_delegates_to_col_delete(self, col_delete): cols_selected = ['source_id'] - self.set_up_request(cols_selected, 'delete') + self._set_up_request(cols_selected, 'delete') journalist.col_process() @@ -30,7 +41,7 @@ def test_col_process_delegates_to_col_delete(self, col_delete): @patch("journalist.col_star") def test_col_process_delegates_to_col_star(self, col_star): cols_selected = ['source_id'] - self.set_up_request(cols_selected, 'star') + self._set_up_request(cols_selected, 'star') journalist.col_process() @@ -39,7 +50,7 @@ def test_col_process_delegates_to_col_star(self, col_star): @patch("journalist.col_un_star") def test_col_process_delegates_to_col_un_star(self, col_un_star): cols_selected = ['source_id'] - self.set_up_request(cols_selected, 'un-star') + self._set_up_request(cols_selected, 'un-star') journalist.col_process() @@ -48,7 +59,7 @@ def test_col_process_delegates_to_col_un_star(self, col_un_star): @patch("journalist.abort") def test_col_process_returns_404_with_bad_action(self, abort): cols_selected = ['source_id'] - self.set_up_request(cols_selected, 'something-random') + self._set_up_request(cols_selected, 'something-random') journalist.col_process() @@ -67,10 +78,6 @@ def test_col_un_star_call_db(self, db_session): db_session.commit.assert_called_with() - def set_up_request(self, cols_selected, action): - journalist.request.form.__contains__.return_value = True - journalist.request.form.getlist = MagicMock(return_value=cols_selected) - journalist.request.form.__getitem__.return_value = action @classmethod def tearDownClass(cls): @@ -78,33 +85,19 @@ def tearDownClass(cls): # break other tests reload(journalist) - +seshy = None class TestJournalistLogin(unittest.TestCase): def setUp(self): - common.shared_setup() + SetUp.setup() # Patch the two-factor verification so it always succeeds - patcher = patch('db.Journalist.verify_token') - self.addCleanup(patcher.stop) - self.mock_journalist_verify_token = patcher.start() - self.mock_journalist_verify_token.return_value = True - - self.username = "test user" - self.password = "test password" - self.user = Journalist( - username=self.username, - password=self.password) - db_session.add(self.user) - db_session.commit() - - # Use a patched login function to avoid dealing with two-factor tokens - # (which are being ignored here anyway) - self.login = lambda username, password: \ - Journalist.login(username, password, "") + TestJournalist.mock_verify_token(self) + + self.user, self.user_pw = TestJournalist.init_journalist() def tearDown(self): - common.shared_teardown() + TearDown.teardown() # TODO: figure out why this is necessary here, but unnecessary in all # of the tests in `tests/test_unit_*.py`. Without this, the session # continues to return values even if the underlying database is deleted @@ -113,19 +106,16 @@ def tearDown(self): @patch('db.Journalist._scrypt_hash') @patch('db.Journalist.valid_password', return_value=True) - def test_login_with_valid_length_password_calls_scrypt( - self, mock_scrypt_hash, mock_valid_password): - self.login(self.username, self.password) + def test_login_calls_scrypt(self, mock_scrypt_hash, mock_valid_password): + Journalist.login(self.user.username, self.user_pw, 'mocked') self.assertTrue(mock_scrypt_hash.called, "Failed to call _scrypt_hash for password w/ valid length") @patch('db.Journalist._scrypt_hash') - def test_login_with_invalid_length_password_doesnt_call_scrypt( - self, mock_scrypt_hash): - print "test_login_with_invalid_length_password_calls_scrypt" + def test_invalid_login_doesnt_call_scrypt(self, mock_scrypt_hash): invalid_pw = 'a'*(Journalist.MAX_PASSWORD_LEN + 1) with self.assertRaises(InvalidPasswordLength): - self.login(self.username, invalid_pw) + Journalist.login(self.user.username, invalid_pw, 'mocked') self.assertFalse(mock_scrypt_hash.called, "Called _scrypt_hash for password w/ invalid length") @@ -134,4 +124,3 @@ def tearDownClass(cls): # Reset the module variables that were changed to mocks so we don't # break other tests reload(journalist) - diff --git a/securedrop/tests/test_unit_crypto_util.py b/securedrop/tests/test_unit_crypto_util.py index 083fea9007..06a059cd9c 100644 --- a/securedrop/tests/test_unit_crypto_util.py +++ b/securedrop/tests/test_unit_crypto_util.py @@ -1,11 +1,14 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- + import os import unittest + # Set environment variable so config.py uses a test environment os.environ['SECUREDROP_ENV'] = 'test' + import config -import common +from common import SetUp, TearDown import crypto_util @@ -14,10 +17,10 @@ class TestCryptoUtil(unittest.TestCase): """The set of tests for crypto_util.py.""" def setUp(self): - common.shared_setup() + SetUp.setup() def tearDown(self): - common.shared_teardown() + TearDown.teardown() def test_clean(self): with self.assertRaises(crypto_util.CryptoException): diff --git a/securedrop/tests/test_unit_integration.py b/securedrop/tests/test_unit_integration.py index a289630894..0444a71b21 100644 --- a/securedrop/tests/test_unit_integration.py +++ b/securedrop/tests/test_unit_integration.py @@ -1,40 +1,32 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- + +from bs4 import BeautifulSoup +from cStringIO import StringIO +from flask import session, g, escape +import gnupg +import gzip +import mock import os -import unittest import re -from cStringIO import StringIO -import zipfile -from time import sleep -import tempfile import shutil +import tempfile import time -import gzip - -import mock - -import gnupg -from flask import session, g, escape -from bs4 import BeautifulSoup +import unittest +import zipfile # Set environment variable so config.py uses a test environment os.environ['SECUREDROP_ENV'] = 'test' + +from common import Async, SetUp, TearDown import config import crypto_util -import source -import journalist -import common from db import db_session, Journalist +import journalist +import source import store - -def _block_on_reply_keypair_gen(codename): - sid = crypto_util.hash_codename(codename) - while not crypto_util.getkey(sid): - sleep(0.1) - - class TestIntegration(unittest.TestCase): def _login_user(self): @@ -44,21 +36,8 @@ def _login_user(self): token='mocked'), follow_redirects=True) - def _wait_for(self, function_with_assertion, timeout=5): - """Polling wait for an arbitrary assertion.""" - # Thanks to - # http://chimera.labs.oreilly.com/books/1234000000754/ch20.html#_a_common_selenium_problem_race_conditions - start_time = time.time() - while time.time() - start_time < timeout: - try: - return function_with_assertion() - except AssertionError: - time.sleep(0.1) - # one more try, which will raise any errors if they are outstanding - return function_with_assertion() - def setUp(self): - common.shared_setup() + SetUp.setup() self.source_app = source.app.test_client() self.journalist_app = journalist.app.test_client() @@ -81,30 +60,30 @@ def setUp(self): self._login_user() def tearDown(self): - common.shared_teardown() + TearDown.teardown() def test_submit_message(self): """When a source creates an account, test that a new entry appears in the journalist interface""" test_msg = "This is a test message." with self.source_app as source_app: - rv = source_app.get('/generate') - rv = source_app.post('/create', follow_redirects=True) + resp = source_app.get('/generate') + resp = source_app.post('/create', follow_redirects=True) codename = session['codename'] sid = g.sid # redirected to submission form - rv = self.source_app.post('/submit', data=dict( + resp = self.source_app.post('/submit', data=dict( msg=test_msg, fh=(StringIO(''), ''), ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - common.logout(source_app) + self.assertEqual(resp.status_code, 200) + source_app.get('/logout') # Request the Document Interface index - rv = self.journalist_app.get('/') - self.assertEqual(rv.status_code, 200) - self.assertIn("Sources", rv.data) - soup = BeautifulSoup(rv.data) + resp = self.journalist_app.get('/') + self.assertEqual(resp.status_code, 200) + self.assertIn("Sources", resp.data) + soup = BeautifulSoup(resp.data) # The source should have a "download unread" link that says "1 unread" col = soup.select('ul#cols > li')[0] @@ -112,58 +91,58 @@ def test_submit_message(self): self.assertIn("1 unread", unread_span.get_text()) col_url = soup.select('ul#cols > li a')[0]['href'] - rv = self.journalist_app.get(col_url) - self.assertEqual(rv.status_code, 200) - soup = BeautifulSoup(rv.data) + resp = self.journalist_app.get(col_url) + self.assertEqual(resp.status_code, 200) + soup = BeautifulSoup(resp.data) submission_url = soup.select('ul#submissions li a')[0]['href'] self.assertIn("-msg", submission_url) span = soup.select('ul#submissions li span.info span')[0] self.assertRegexpMatches(span['title'], "\d+ bytes") - rv = self.journalist_app.get(submission_url) - self.assertEqual(rv.status_code, 200) - decrypted_data = self.gpg.decrypt(rv.data) + resp = self.journalist_app.get(submission_url) + self.assertEqual(resp.status_code, 200) + decrypted_data = self.gpg.decrypt(resp.data) self.assertTrue(decrypted_data.ok) self.assertEqual(decrypted_data.data, test_msg) # delete submission - rv = self.journalist_app.get(col_url) - self.assertEqual(rv.status_code, 200) - soup = BeautifulSoup(rv.data) + resp = self.journalist_app.get(col_url) + self.assertEqual(resp.status_code, 200) + soup = BeautifulSoup(resp.data) doc_name = soup.select( 'ul > li > input[name="doc_names_selected"]')[0]['value'] - rv = self.journalist_app.post('/bulk', data=dict( + resp = self.journalist_app.post('/bulk', data=dict( action='confirm_delete', sid=sid, doc_names_selected=doc_name )) - self.assertEqual(rv.status_code, 200) - soup = BeautifulSoup(rv.data) - self.assertIn("The following file has been selected for", rv.data) + self.assertEqual(resp.status_code, 200) + soup = BeautifulSoup(resp.data) + self.assertIn("The following file has been selected for", resp.data) # confirm delete submission doc_name = soup.select doc_name = soup.select( 'ul > li > input[name="doc_names_selected"]')[0]['value'] - rv = self.journalist_app.post('/bulk', data=dict( + resp = self.journalist_app.post('/bulk', data=dict( action='delete', sid=sid, doc_names_selected=doc_name, ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - soup = BeautifulSoup(rv.data) - self.assertIn("Submission deleted.", rv.data) + self.assertEqual(resp.status_code, 200) + soup = BeautifulSoup(resp.data) + self.assertIn("Submission deleted.", resp.data) # confirm that submission deleted and absent in list of submissions - rv = self.journalist_app.get(col_url) - self.assertEqual(rv.status_code, 200) - self.assertIn("No documents to display.", rv.data) + resp = self.journalist_app.get(col_url) + self.assertEqual(resp.status_code, 200) + self.assertIn("No documents to display.", resp.data) # the file should be deleted from the filesystem # since file deletion is handled by a polling worker, this test needs # to wait for the worker to get the job and execute it - self._wait_for( + Async.wait_for_assertion( lambda: self.assertFalse(os.path.exists(store.path(sid, doc_name))) ) @@ -173,22 +152,22 @@ def test_submit_file(self): test_filename = "test.txt" with self.source_app as source_app: - rv = source_app.get('/generate') - rv = source_app.post('/create', follow_redirects=True) + resp = source_app.get('/generate') + resp = source_app.post('/create', follow_redirects=True) codename = session['codename'] sid = g.sid # redirected to submission form - rv = self.source_app.post('/submit', data=dict( + resp = self.source_app.post('/submit', data=dict( msg="", fh=(StringIO(test_file_contents), test_filename), ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - common.logout(source_app) + self.assertEqual(resp.status_code, 200) + source_app.get('/logout') - rv = self.journalist_app.get('/') - self.assertEqual(rv.status_code, 200) - self.assertIn("Sources", rv.data) - soup = BeautifulSoup(rv.data) + resp = self.journalist_app.get('/') + self.assertEqual(resp.status_code, 200) + self.assertIn("Sources", resp.data) + soup = BeautifulSoup(resp.data) # The source should have a "download unread" link that says "1 unread" col = soup.select('ul#cols > li')[0] @@ -196,17 +175,17 @@ def test_submit_file(self): self.assertIn("1 unread", unread_span.get_text()) col_url = soup.select('ul#cols > li a')[0]['href'] - rv = self.journalist_app.get(col_url) - self.assertEqual(rv.status_code, 200) - soup = BeautifulSoup(rv.data) + resp = self.journalist_app.get(col_url) + self.assertEqual(resp.status_code, 200) + soup = BeautifulSoup(resp.data) submission_url = soup.select('ul#submissions li a')[0]['href'] self.assertIn("-doc", submission_url) span = soup.select('ul#submissions li span.info span')[0] self.assertRegexpMatches(span['title'], "\d+ bytes") - rv = self.journalist_app.get(submission_url) - self.assertEqual(rv.status_code, 200) - decrypted_data = self.gpg.decrypt(rv.data) + resp = self.journalist_app.get(submission_url) + self.assertEqual(resp.status_code, 200) + decrypted_data = self.gpg.decrypt(resp.data) self.assertTrue(decrypted_data.ok) sio = StringIO(decrypted_data.data) @@ -215,43 +194,43 @@ def test_submit_file(self): self.assertEqual(unzipped_decrypted_data, test_file_contents) # delete submission - rv = self.journalist_app.get(col_url) - self.assertEqual(rv.status_code, 200) - soup = BeautifulSoup(rv.data) + resp = self.journalist_app.get(col_url) + self.assertEqual(resp.status_code, 200) + soup = BeautifulSoup(resp.data) doc_name = soup.select( 'ul > li > input[name="doc_names_selected"]')[0]['value'] - rv = self.journalist_app.post('/bulk', data=dict( + resp = self.journalist_app.post('/bulk', data=dict( action='confirm_delete', sid=sid, doc_names_selected=doc_name )) - self.assertEqual(rv.status_code, 200) - soup = BeautifulSoup(rv.data) - self.assertIn("The following file has been selected for", rv.data) + self.assertEqual(resp.status_code, 200) + soup = BeautifulSoup(resp.data) + self.assertIn("The following file has been selected for", resp.data) # confirm delete submission doc_name = soup.select doc_name = soup.select( 'ul > li > input[name="doc_names_selected"]')[0]['value'] - rv = self.journalist_app.post('/bulk', data=dict( + resp = self.journalist_app.post('/bulk', data=dict( action='delete', sid=sid, doc_names_selected=doc_name, ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - soup = BeautifulSoup(rv.data) - self.assertIn("Submission deleted.", rv.data) + self.assertEqual(resp.status_code, 200) + soup = BeautifulSoup(resp.data) + self.assertIn("Submission deleted.", resp.data) # confirm that submission deleted and absent in list of submissions - rv = self.journalist_app.get(col_url) - self.assertEqual(rv.status_code, 200) - self.assertIn("No documents to display.", rv.data) + resp = self.journalist_app.get(col_url) + self.assertEqual(resp.status_code, 200) + self.assertIn("No documents to display.", resp.data) # the file should be deleted from the filesystem # since file deletion is handled by a polling worker, this test needs # to wait for the worker to get the job and execute it - self._wait_for( + Async.wait_for_assertion( lambda: self.assertFalse(os.path.exists(store.path(sid, doc_name))) ) @@ -308,89 +287,97 @@ def _can_decrypt_with_key(self, msg, key_fpr, passphrase=None): # We have to clean up the temporary GPG dir shutil.rmtree(gpg_tmp_dir) + def _block_on_reply_keypair_gen(self, filesystem_id, timeout): + """Wait for the asymmetric keypair associated with the + :class:`db.Source` with :attr:`db.Source.filesystem_id` + *filesystem_id* to appear in the GPG keyring, or timeout after + *timeout* seconds.""" + def get_reply_keypair(filesystem_id): + return crypto_util.getkey(filesystem_id) + return Async.wait_for_function_result(get_reply_keypair) + def helper_test_reply(self, test_reply, expected_success=True): test_msg = "This is a test message." with self.source_app as source_app: - rv = source_app.get('/generate') - rv = source_app.post('/create', follow_redirects=True) + resp = source_app.get('/generate') + resp = source_app.post('/create', follow_redirects=True) codename = session['codename'] sid = g.sid # redirected to submission form - rv = source_app.post('/submit', data=dict( + resp = source_app.post('/submit', data=dict( msg=test_msg, fh=(StringIO(''), ''), ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) + self.assertEqual(resp.status_code, 200) self.assertFalse(g.source.flagged) - common.logout(source_app) + source_app.get('/logout') - rv = self.journalist_app.get('/') - self.assertEqual(rv.status_code, 200) - self.assertIn("Sources", rv.data) - soup = BeautifulSoup(rv.data) + resp = self.journalist_app.get('/') + self.assertEqual(resp.status_code, 200) + self.assertIn("Sources", resp.data) + soup = BeautifulSoup(resp.data) col_url = soup.select('ul#cols > li a')[0]['href'] - rv = self.journalist_app.get(col_url) - self.assertEqual(rv.status_code, 200) + resp = self.journalist_app.get(col_url) + self.assertEqual(resp.status_code, 200) with self.source_app as source_app: - rv = source_app.post('/login', data=dict( + resp = source_app.post('/login', data=dict( codename=codename), follow_redirects=True) - self.assertEqual(rv.status_code, 200) + self.assertEqual(resp.status_code, 200) self.assertFalse(g.source.flagged) - common.logout(source_app) + source_app.get('/logout') with self.journalist_app as journalist_app: - rv = journalist_app.post('/flag', data=dict( + resp = journalist_app.post('/flag', data=dict( sid=sid)) - self.assertEqual(rv.status_code, 200) + self.assertEqual(resp.status_code, 200) with self.source_app as source_app: - rv = source_app.post('/login', data=dict( + resp = source_app.post('/login', data=dict( codename=codename), follow_redirects=True) - self.assertEqual(rv.status_code, 200) + self.assertEqual(resp.status_code, 200) self.assertTrue(g.source.flagged) source_app.get('/lookup') self.assertTrue(g.source.flagged) - common.logout(source_app) + source_app.get('/logout') - # Block until the reply keypair has been generated, so we can test - # sending a reply - _block_on_reply_keypair_gen(codename) + # Block up to 15s for the reply keypair, so we can test sending a reply + self._block_on_reply_keypair_gen(sid, 15) # Create 2 replies to test deleting on journalist and source interface for i in range(2): - rv = self.journalist_app.post('/reply', data=dict( + resp = self.journalist_app.post('/reply', data=dict( sid=sid, msg=test_reply ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) + self.assertEqual(resp.status_code, 200) if not expected_success: pass else: - self.assertIn("Thanks! Your reply has been stored.", rv.data) + self.assertIn("Thanks! Your reply has been stored.", resp.data) with self.journalist_app as journalist_app: - rv = journalist_app.get(col_url) - self.assertIn("reply-", rv.data) + resp = journalist_app.get(col_url) + self.assertIn("reply-", resp.data) - soup = BeautifulSoup(rv.data) + soup = BeautifulSoup(resp.data) # Download the reply and verify that it can be decrypted with the # journalist's key as well as the source's reply key sid = soup.select('input[name="sid"]')[0]['value'] checkbox_values = [ soup.select('input[name="doc_names_selected"]')[1]['value']] - rv = self.journalist_app.post('/bulk', data=dict( + resp = self.journalist_app.post('/bulk', data=dict( sid=sid, action='download', doc_names_selected=checkbox_values ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) + self.assertEqual(resp.status_code, 200) - zf = zipfile.ZipFile(StringIO(rv.data), 'r') + zf = zipfile.ZipFile(StringIO(resp.data), 'r') data = zf.read(zf.namelist()[0]) self._can_decrypt_with_key(data, config.JOURNALIST_KEY) self._can_decrypt_with_key(data, crypto_util.getkey(sid), codename) @@ -401,39 +388,39 @@ def helper_test_reply(self, test_reply, expected_success=True): self.helper_filenames_delete(soup, last_reply_number) with self.source_app as source_app: - rv = source_app.post('/login', data=dict(codename=codename), + resp = source_app.post('/login', data=dict(codename=codename), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - rv = source_app.get('/lookup') - self.assertEqual(rv.status_code, 200) + self.assertEqual(resp.status_code, 200) + resp = source_app.get('/lookup') + self.assertEqual(resp.status_code, 200) if not expected_success: # there should be no reply - self.assertNotIn("You have received a reply.", rv.data) + self.assertNotIn("You have received a reply.", resp.data) else: self.assertIn( "You have received a reply. For your security, please delete all replies when you're done with them.", - rv.data) - self.assertIn(test_reply, rv.data) - soup = BeautifulSoup(rv.data) + resp.data) + self.assertIn(test_reply, resp.data) + soup = BeautifulSoup(resp.data) msgid = soup.select( 'form.message > input[name="reply_filename"]')[0]['value'] - rv = source_app.post('/delete', data=dict( + resp = source_app.post('/delete', data=dict( sid=sid, reply_filename=msgid ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - self.assertIn("Reply deleted", rv.data) + self.assertEqual(resp.status_code, 200) + self.assertIn("Reply deleted", resp.data) # Make sure the reply is deleted from the filesystem - self._wait_for( + Async.wait_for_assertion( lambda: self.assertFalse( os.path.exists( store.path( sid, msgid)))) - common.logout(source_app) + source_app.get('/logout') def test_delete_collection(self): """Test the "delete collection" button on each collection page""" @@ -445,27 +432,27 @@ def test_delete_collection(self): fh=(StringIO(''), ''), ), follow_redirects=True) - rv = self.journalist_app.get('/') + resp = self.journalist_app.get('/') # navigate to the collection page - soup = BeautifulSoup(rv.data) + soup = BeautifulSoup(resp.data) first_col_url = soup.select('ul#cols > li a')[0]['href'] - rv = self.journalist_app.get(first_col_url) - self.assertEqual(rv.status_code, 200) + resp = self.journalist_app.get(first_col_url) + self.assertEqual(resp.status_code, 200) # find the delete form and extract the post parameters - soup = BeautifulSoup(rv.data) + soup = BeautifulSoup(resp.data) delete_form_inputs = soup.select('form#delete_collection')[0]('input') sid = delete_form_inputs[1]['value'] col_name = delete_form_inputs[2]['value'] - rv = self.journalist_app.post('/col/delete/' + sid, + resp = self.journalist_app.post('/col/delete/' + sid, follow_redirects=True) - self.assertEquals(rv.status_code, 200) + self.assertEquals(resp.status_code, 200) - self.assertIn(escape("%s's collection deleted" % (col_name,)), rv.data) - self.assertIn("No documents have been submitted!", rv.data) + self.assertIn(escape("%s's collection deleted" % (col_name,)), resp.data) + self.assertIn("No documents have been submitted!", resp.data) # Make sure the collection is deleted from the filesystem - self._wait_for( + Async.wait_for_assertion( lambda: self.assertFalse(os.path.exists(store.path(sid))) ) @@ -481,22 +468,22 @@ def test_delete_collections(self): msg="This is a test " + str(i) + ".", fh=(StringIO(''), ''), ), follow_redirects=True) - common.logout(self.source_app) + self.source_app.get('/logout') - rv = self.journalist_app.get('/') + resp = self.journalist_app.get('/') # get all the checkbox values - soup = BeautifulSoup(rv.data) + soup = BeautifulSoup(resp.data) checkbox_values = [checkbox['value'] for checkbox in soup.select('input[name="cols_selected"]')] - rv = self.journalist_app.post('/col/process', data=dict( + resp = self.journalist_app.post('/col/process', data=dict( action='delete', cols_selected=checkbox_values ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - self.assertIn("%s collections deleted" % (num_sources,), rv.data) + self.assertEqual(resp.status_code, 200) + self.assertIn("%s collections deleted" % (num_sources,), resp.data) # Make sure the collections are deleted from the filesystem - self._wait_for(lambda: self.assertFalse( + Async.wait_for_assertion(lambda: self.assertFalse( any([os.path.exists(store.path(sid)) for sid in checkbox_values]))) def test_filenames(self): @@ -507,14 +494,14 @@ def test_filenames(self): self.helper_filenames_submit() # navigate to the collection page - rv = self.journalist_app.get('/') - soup = BeautifulSoup(rv.data) + resp = self.journalist_app.get('/') + soup = BeautifulSoup(resp.data) first_col_url = soup.select('ul#cols > li a')[0]['href'] - rv = self.journalist_app.get(first_col_url) - self.assertEqual(rv.status_code, 200) + resp = self.journalist_app.get(first_col_url) + self.assertEqual(resp.status_code, 200) # test filenames and sort order - soup = BeautifulSoup(rv.data) + soup = BeautifulSoup(resp.data) submission_filename_re = r'^{0}-[a-z0-9-_]+(-msg|-doc\.gz)\.gpg$' for i, submission_link in enumerate( soup.select('ul#submissions li a .filename')): @@ -530,17 +517,17 @@ def test_filenames_delete(self): self.helper_filenames_submit() # navigate to the collection page - rv = self.journalist_app.get('/') - soup = BeautifulSoup(rv.data) + resp = self.journalist_app.get('/') + soup = BeautifulSoup(resp.data) first_col_url = soup.select('ul#cols > li a')[0]['href'] - rv = self.journalist_app.get(first_col_url) - self.assertEqual(rv.status_code, 200) - soup = BeautifulSoup(rv.data) + resp = self.journalist_app.get(first_col_url) + self.assertEqual(resp.status_code, 200) + soup = BeautifulSoup(resp.data) # delete file #2 self.helper_filenames_delete(soup, 1) - rv = self.journalist_app.get(first_col_url) - soup = BeautifulSoup(rv.data) + resp = self.journalist_app.get(first_col_url) + soup = BeautifulSoup(resp.data) # test filenames and sort order submission_filename_re = r'^{0}-[a-z0-9-_]+(-msg|-doc\.gz)\.gpg$' @@ -564,15 +551,15 @@ def test_user_change_password(self): )) # logout - common.logout(self.journalist_app) + self.journalist_app.get('/logout') # login with new credentials should redirect to index page - rv = self.journalist_app.post('/login', data=dict( + resp = self.journalist_app.post('/login', data=dict( username=self.user.username, password='newpass', token='mocked', follow_redirects=True)) - self.assertEqual(rv.status_code, 302) + self.assertEqual(resp.status_code, 302) def test_login_after_regenerate_hotp(self): """Test that journalists can login after resetting their HOTP 2fa""" @@ -582,20 +569,20 @@ def test_login_after_regenerate_hotp(self): otp_secret=123456)) # successful verificaton should redirect to /account - rv = self.journalist_app.post('/account/2fa', data=dict( + resp = self.journalist_app.post('/account/2fa', data=dict( token=self.user.hotp)) - self.assertEqual(rv.status_code, 302) + self.assertEqual(resp.status_code, 302) # log out - common.logout(self.journalist_app) + self.journalist_app.get('/logout') # login with new 2fa secret should redirect to index page - rv = self.journalist_app.post('/login', data=dict( + resp = self.journalist_app.post('/login', data=dict( username=self.user.username, password=self.user_pw, token=self.user.hotp, follow_redirects=True)) - self.assertEqual(rv.status_code, 302) + self.assertEqual(resp.status_code, 302) def helper_filenames_submit(self): self.source_app.post('/submit', data=dict( @@ -617,27 +604,27 @@ def helper_filenames_delete(self, soup, i): soup.select('input[name="doc_names_selected"]')[i]['value']] # delete - rv = self.journalist_app.post('/bulk', data=dict( + resp = self.journalist_app.post('/bulk', data=dict( sid=sid, action='confirm_delete', doc_names_selected=checkbox_values ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) + self.assertEqual(resp.status_code, 200) self.assertIn( "The following file has been selected for permanent deletion", - rv.data) + resp.data) # confirm delete - rv = self.journalist_app.post('/bulk', data=dict( + resp = self.journalist_app.post('/bulk', data=dict( sid=sid, action='delete', doc_names_selected=checkbox_values ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - self.assertIn("Submission deleted.", rv.data) + self.assertEqual(resp.status_code, 200) + self.assertIn("Submission deleted.", resp.data) # Make sure the files were deleted from the filesystem - self._wait_for(lambda: self.assertFalse( + Async.wait_for_assertion(lambda: self.assertFalse( any([os.path.exists(store.path(sid, doc_name)) for doc_name in checkbox_values]))) diff --git a/securedrop/tests/test_unit_journalist.py b/securedrop/tests/test_unit_journalist.py index cf9e29a2b5..52217d8d3e 100644 --- a/securedrop/tests/test_unit_journalist.py +++ b/securedrop/tests/test_unit_journalist.py @@ -1,356 +1,392 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# Set environment variable so config.py uses a test environment -os.environ['SECUREDROP_ENV'] = 'test' - +import crypto_util from cStringIO import StringIO -import datetime from flask import url_for, escape from flask_testing import TestCase -import mock import os +import random import time import unittest import zipfile -import common -import crypto_util -import journalist +# Set environment variable so config.py uses a test environment +os.environ['SECUREDROP_ENV'] = 'test' + +import config +from common import (Async, SetUp, TearDown, TestJournalist, TestReply, + TestSource, TestSubmission) from db import (db_session, InvalidPasswordLength, Journalist, Reply, Source, Submission) +import journalist -class TestJournalist(TestCase): +class TestJournalistApp(TestCase): + # A method required by flask_testing.TestCase def create_app(self): return journalist.app - def add_source_and_submissions(self): - sid = crypto_util.hashd_codename(crypto_util.genrandomid()) - codename = crypto_util.display_id() - crypto_util.genkeypair(sid, codename) - source = Source(sid, codename) - db_session.add(source) - db_session.commit() - files = ['1-abc1-msg.gpg', '2-abc2-msg.gpg'] - filenames = common.setup_test_docs(sid, files) - return source, files - - def add_source_and_replies(self): - source, files = self.add_source_and_submissions() - files = ['1-def-reply.gpg', '2-def-reply.gpg'] - filenames = common.setup_test_replies(source.filesystem_id, - self.user.id, - files) - return source, files - def setUp(self): - common.shared_setup() + SetUp.setup() # Patch the two-factor verification to avoid intermittent errors - patcher = mock.patch('db.Journalist.verify_token') - self.addCleanup(patcher.stop) - self.mock_journalist_verify_token = patcher.start() - self.mock_journalist_verify_token.return_value = True - - # Set up test users - self.user_pw = "bar" - self.user = Journalist(username="foo", - password=self.user_pw) - self.admin_user_pw = "admin" - self.admin_user = Journalist(username="admin", - password=self.admin_user_pw, - is_admin=True) - db_session.add(self.user) - db_session.add(self.admin_user) - db_session.commit() + TestJournalist.mock_verify_token(self) - def tearDown(self): - common.shared_teardown() - - def test_index_should_redirect_to_login(self): - res = self.client.get(url_for('index')) - self.assertRedirects(res, url_for('login')) - - def test_invalid_user_login_should_fail(self): - res = self.client.post(url_for('login'), data=dict( - username='invalid', - password='invalid', - token='mocked')) - self.assert200(res) - self.assertIn("Login failed", res.data) - - def test_valid_user_login_should_succeed(self): - res = self.client.post(url_for('login'), data=dict( - username=self.user.username, - password=self.user_pw, - token='mocked'), - follow_redirects=True) + # Setup test users: user & admin + self.user, self.user_pw = TestJournalist.init_journalist() + self.admin, self.admin_pw = TestJournalist.init_admin() - self.assert200(res) # successful login redirects to index - self.assertIn("Sources", res.data) - self.assertIn("No documents have been submitted!", res.data) - - def test_normal_and_admin_user_login_should_redirect_to_index(self): - """Normal users and admin users should both redirect to the index page after logging in successfully""" - res = self.client.post(url_for('login'), data=dict( - username=self.user.username, - password=self.user_pw, - token='mocked')) - self.assertRedirects(res, url_for('index')) - - res = self.client.post(url_for('login'), data=dict( - username=self.admin_user.username, - password=self.admin_user_pw, - token='mocked')) - self.assertRedirects(res, url_for('index')) - - def test_admin_user_has_admin_link_in_index(self): - res = self.client.post(url_for('login'), data=dict( - username=self.admin_user.username, - password=self.admin_user_pw, - token='mocked'), - follow_redirects=True) - admin_link = '{}'.format( - url_for('admin_index'), - "Admin") - self.assertIn(admin_link, res.data) - - def test_user_has_edit_account_link_in_index(self): - res = self.client.post(url_for('login'), data=dict( - username=self.user.username, - password=self.user_pw, - token='mocked'), - follow_redirects=True) - edit_account_link = '{}'.format( - url_for('edit_account'), - "Edit Account") - self.assertIn(edit_account_link, res.data) + def tearDown(self): + TearDown.teardown() + + def test_unauthorized_access_redirects_to_login(self): + resp = self.client.get(url_for('index')) + self.assertRedirects(resp, url_for('login')) + + def test_invalid_credentials(self): + resp = self.client.post(url_for('login'), + data=dict(username=self.user.username, + password='invalid', + token='mocked')) + self.assert200(resp) + self.assertIn("Login failed", resp.data) + + def test_valid_credentials(self): + resp = self.client.post(url_for('login'), + data=dict(username=self.user.username, + password=self.user_pw, + token='mocked'), + follow_redirects=True) + self.assert200(resp) # successful login redirects to index + self.assertIn("Sources", resp.data) + self.assertIn("No documents have been submitted!", resp.data) + + def test_admin_login_redirects_to_index(self): + resp = self.client.post(url_for('login'), + data=dict(username=self.admin.username, + password=self.admin_pw, + token='mocked')) + self.assertRedirects(resp, url_for('index')) + + def test_user_login_redirects_to_index(self): + resp = self.client.post(url_for('login'), + data=dict(username=self.user.username, + password=self.user_pw, + token='mocked')) + self.assertRedirects(resp, url_for('index')) + + def test_admin_has_link_to_edit_account_page_in_index_page(self): + resp = self.client.post(url_for('login'), + data=dict(username=self.admin.username, + password=self.admin_pw, + token='mocked'), + follow_redirects=True) + edit_account_link = '{}'.format(url_for('edit_account'), + "Edit Account") + self.assertIn(edit_account_link, resp.data) + + def test_user_has_link_to_edit_account_page_in_index_page(self): + resp = self.client.post(url_for('login'), + data=dict(username=self.user.username, + password=self.user_pw, + token='mocked'), + follow_redirects=True) + edit_account_link = '{}'.format(url_for('edit_account'), + "Edit Account") + self.assertIn(edit_account_link, resp.data) + + + def test_admin_has_link_to_admin_index_page_in_index_page(self): + resp = self.client.post(url_for('login'), + data=dict(username=self.admin.username, + password=self.admin_pw, + token='mocked'), + follow_redirects=True) + admin_link = '{}'.format(url_for('admin_index'), + "Admin") + self.assertIn(admin_link, resp.data) + + def test_user_lacks_link_to_admin_index_page_in_index_page(self): + resp = self.client.post(url_for('login'), + data=dict(username=self.user.username, + password=self.user_pw, + token='mocked'), + follow_redirects=True) + admin_link = '{}'.format(url_for('admin_index'), + "Admin") + self.assertNotIn(admin_link, resp.data) + + # WARNING: we are purposely doing something that would not work in + # production in the _login_user and _login_admin methods. This is done as a + # reminder to the test developer that the flask_testing.TestCase only uses + # one request context per method (see + # https://github.com/freedomofpress/securedrop/issues/1444). By explicitly + # making a point of this, we hope to avoid the introduction of new tests, + # that do not truly prove their result because of this disconnect between + # request context in Flask Testing and production. + # + # TODO: either ditch Flask Testing or subclass it as discussed in the + # aforementioned issue to fix the described problem. + def _login_admin(self): + self._ctx.g.user = self.admin def _login_user(self): - self.client.post(url_for('login'), data=dict( - username=self.user.username, - password=self.user_pw, - token='mocked'), - follow_redirects=True) + self._ctx.g.user = self.user - def _login_admin(self): - self.client.post(url_for('login'), data=dict( - username=self.admin_user.username, - password=self.admin_user_pw, - token='mocked'), - follow_redirects=True) + def test_admin_logout_redirects_to_index(self): + self._login_admin() + resp = self.client.get(url_for('logout')) + self.assertRedirects(resp, url_for('index')) - def test_user_logout(self): + def test_user_logout_redirects_to_index(self): self._login_user() - res = self.client.get(url_for('logout')) - self.assertRedirects(res, url_for('index')) - - def test_admin_logout(self): - self._login_admin() - res = self.client.get(url_for('logout')) - self.assertRedirects(res, url_for('index')) + resp = self.client.get(url_for('logout')) + self.assertRedirects(resp, url_for('index')) def test_admin_index(self): self._login_admin() - res = self.client.get(url_for('admin_index')) - self.assert200(res) - self.assertIn("Admin Interface", res.data) + resp = self.client.get(url_for('admin_index')) + self.assert200(resp) + self.assertIn("Admin Interface", resp.data) def test_admin_delete_user(self): + # Verify journalist is in the database + self.assertNotEqual(Journalist.query.get(self.user.id), None) + self._login_admin() + resp = self.client.post(url_for('admin_delete_user', user_id=self.user.id), + follow_redirects=True) - res = self.client.post( - url_for('admin_delete_user', user_id=self.user.id), - follow_redirects=True) - self.assert200(res) + # Assert correct interface behavior + self.assert200(resp) self.assertIn(escape("Deleted user '{}'".format(self.user.username)), - res.data) - - # verify journalist foo is no longer in the database - user = Journalist.query.get(self.user.id) - self.assertEqual(user, None) + resp.data) + # Verify journalist is no longer in the database + self.assertEqual(Journalist.query.get(self.user.id), None) - def test_admin_delete_invalid_user(self): + def test_admin_deletes_invalid_user_404(self): self._login_admin() - invalid_user_pk = max([user.id for user in Journalist.query.all()]) + 1 - res = self.client.post(url_for('admin_delete_user', + resp = self.client.post(url_for('admin_delete_user', user_id=invalid_user_pk)) - self.assert404(res) + self.assert404(resp) - def test_admin_edits_user_password_valid(self): + def test_admin_edits_user_password_success_response(self): self._login_admin() - res = self.client.post( + resp = self.client.post( url_for('admin_edit_user', user_id=self.user.id), - data=dict(username='foo', is_admin=False, + data=dict(username=self.user.username, is_admin=False, password='valid', password_again='valid')) - self.assertIn('Password successfully changed', res.data) + self.assertIn('Password successfully changed', resp.data) - def test_admin_edits_user_password_dont_match(self): + def test_user_edits_password_success_reponse(self): + self._login_user() + resp = self.client.post(url_for('edit_account'), + data=dict(password='valid', + password_again='valid')) + self.assertIn("Password successfully changed", resp.data) + + def test_admin_edits_user_password_mismatch_warning(self): self._login_admin() - res = self.client.post( + resp = self.client.post( url_for('admin_edit_user', user_id=self.user.id), - data=dict(username='foo', is_admin=False, password='not', - password_again='thesame'), + data=dict(username=self.user.username, is_admin=False, + password='not', password_again='thesame'), follow_redirects=True) - self.assertIn(escape("Passwords didn't match"), res.data) + self.assertIn(escape("Passwords didn't match"), resp.data) + + def test_user_edits_password_mismatch_redirect(self): + self._login_user() + resp = self.client.post(url_for('edit_account'), data=dict( + password='not', + password_again='thesame')) + self.assertRedirects(resp, url_for('edit_account')) + + def test_admin_add_user_password_mismatch_warning(self): + self._login_admin() + resp = self.client.post(url_for('admin_add_user'), + data=dict(username='dellsberg', + password='not', + password_again='thesame', + is_admin=False)) + self.assertIn('Passwords didn', resp.data) + + def test_max_password_length(self): + """Creating a Journalist with a password that is greater than the + maximum password length should raise an exception""" + overly_long_password = 'a'*(Journalist.MAX_PASSWORD_LEN + 1) + with self.assertRaises(InvalidPasswordLength): + temp_journalist = Journalist( + username="My Password is Too Big!", + password=overly_long_password) - def test_admin_edits_user_password_too_long(self): + def test_admin_edits_user_password_too_long_warning(self): self._login_admin() overly_long_password = 'a' * (Journalist.MAX_PASSWORD_LEN + 1) - res = self.client.post( + resp = self.client.post( url_for('admin_edit_user', user_id=self.user.id), - data=dict(username='foo', is_admin=False, + data=dict(username=self.user.username, is_admin=False, password=overly_long_password, password_again=overly_long_password), follow_redirects=True) - self.assertIn('Your password is too long', res.data) + self.assertIn('Your password is too long', resp.data) + + def test_user_edits_password_too_long_warning(self): + self._login_user() + overly_long_password = 'a' * (Journalist.MAX_PASSWORD_LEN + 1) + + resp = self.client.post(url_for('edit_account'), + data=dict(password=overly_long_password, + password_again=overly_long_password), + follow_redirects=True) + + self.assertIn('Your password is too long', resp.data) + + def test_admin_add_user_password_too_long_warning(self): + self._login_admin() + + overly_long_password = 'a' * (Journalist.MAX_PASSWORD_LEN + 1) + resp = self.client.post( + url_for('admin_add_user'), + data=dict(username='dellsberg', password=overly_long_password, + password_again=overly_long_password, is_admin=False)) + + self.assertIn('password is too long', resp.data) def test_admin_edits_user_invalid_username(self): """Test expected error message when admin attempts to change a user's username to a username that is taken by another user.""" self._login_admin() + new_username = self.admin.username - new_username = self.admin_user.username - res = self.client.post( + resp = self.client.post( url_for('admin_edit_user', user_id=self.user.id), data=dict(username=new_username, is_admin=False, - password='', password_again='') - ) + password='', password_again='')) self.assertIn('Username {} is already taken'.format(new_username), - res.data) + resp.data) - def test_admin_reset_hotp_success(self): + def test_admin_resets_user_hotp(self): self._login_admin() old_hotp = self.user.hotp.secret - res = self.client.post( - url_for('admin_reset_two_factor_hotp'), - data=dict(uid=self.user.id, otp_secret=123456) - ) - + resp = self.client.post(url_for('admin_reset_two_factor_hotp'), + data=dict(uid=self.user.id, otp_secret=123456)) new_hotp = self.user.hotp.secret + # check that hotp is different self.assertNotEqual(old_hotp, new_hotp) - - self.assertRedirects(res, + # Redirect to admin 2FA view + self.assertRedirects(resp, url_for('admin_new_user_two_factor', uid=self.user.id)) - def test_admin_reset_hotp_empty(self): - self._login_admin() - res = self.client.post( - url_for('admin_reset_two_factor_hotp'), - data=dict(uid=self.user.id) - ) + def test_user_resets_hotp(self): + self._login_user() + oldHotp = self.user.hotp + + resp = self.client.post(url_for('account_reset_two_factor_hotp'), + data=dict(otp_secret=123456)) + newHotp = self.user.hotp - self.assertIn('Change Secret', res.data) + # check that hotp is different + self.assertNotEqual(oldHotp, newHotp) + # should redirect to verification page + self.assertRedirects(resp, url_for('account_new_two_factor')) - def test_admin_reset_totp_success(self): + def test_admin_resets_user_totp(self): self._login_admin() old_totp = self.user.totp - res = self.client.post( + resp = self.client.post( url_for('admin_reset_two_factor_totp'), - data=dict(uid=self.user.id) - ) + data=dict(uid=self.user.id)) new_totp = self.user.totp self.assertNotEqual(old_totp, new_totp) - self.assertRedirects(res, + self.assertRedirects(resp, url_for('admin_new_user_two_factor', uid=self.user.id)) - def test_admin_new_user_2fa_success(self): - self._login_admin() - - res = self.client.post( - url_for('admin_new_user_two_factor', uid=self.user.id), - data=dict(token='mocked') - ) - - self.assertRedirects(res, url_for('admin_index')) + def test_user_resets_totp(self): + self._login_user() + oldTotp = self.user.totp - def test_admin_new_user_2fa_get_req(self): - self._login_admin() + resp = self.client.post(url_for('account_reset_two_factor_totp')) + newTotp = self.user.totp - res = self.client.get( - url_for('admin_new_user_two_factor', uid=self.user.id) - ) + # check that totp is different + self.assertNotEqual(oldTotp, newTotp) - # any GET req should take a user to the admin_new_user_two_factor page - self.assertIn('Authenticator', res.data) + # should redirect to verification page + self.assertRedirects(resp, url_for('account_new_two_factor')) - def test_admin_add_user_get_req(self): + def test_admin_resets_hotp_with_missing_otp_secret_key(self): self._login_admin() + resp = self.client.post(url_for('admin_reset_two_factor_hotp'), + data=dict(uid=self.user.id)) - res = self.client.get(url_for('admin_add_user')) - - # any GET req should take a user to the admin_add_user page - self.assertIn('Add user', res.data) + self.assertIn('Change Secret', resp.data) - def test_admin_add_user_success(self): + def test_admin_new_user_2fa_redirect(self): self._login_admin() + resp = self.client.post( + url_for('admin_new_user_two_factor', uid=self.user.id), + data=dict(token='mocked')) + self.assertRedirects(resp, url_for('admin_index')) - res = self.client.post( - url_for('admin_add_user'), - data=dict(username='dellsberg', - password='pentagonpapers', - password_again='pentagonpapers', - is_admin=False) - ) - - self.assertRedirects(res, url_for('admin_new_user_two_factor', uid=3)) - - def test_admin_add_user_failure_no_username(self): + def test_http_get_on_admin_new_user_two_factor_page(self): self._login_admin() + resp = self.client.get(url_for('admin_new_user_two_factor', uid=self.user.id)) + # any GET req should take a user to the admin_new_user_two_factor page + self.assertIn('Authenticator', resp.data) - res = self.client.post( - url_for('admin_add_user'), - data=dict(username='', password='pentagonpapers', - password_again='pentagonpapers', is_admin=False)) - - self.assertIn('Missing username', res.data) - - def test_admin_add_user_failure_passwords_dont_match(self): + def test_http_get_on_admin_add_user_page(self): self._login_admin() + resp = self.client.get(url_for('admin_add_user')) + # any GET req should take a user to the admin_add_user page + self.assertIn('Add user', resp.data) - res = self.client.post( - url_for('admin_add_user'), - data=dict(username='dellsberg', password='not', - password_again='thesame', is_admin=False)) - - self.assertIn('Passwords didn', res.data) - - def test_admin_add_user_failure_password_too_long(self): + def test_admin_add_user(self): self._login_admin() + max_journalist_pk = max([user.id for user in Journalist.query.all()]) - overly_long_password = 'a' * (Journalist.MAX_PASSWORD_LEN + 1) - res = self.client.post( - url_for('admin_add_user'), - data=dict(username='dellsberg', password=overly_long_password, - password_again=overly_long_password, is_admin=False)) + resp = self.client.post(url_for('admin_add_user'), + data=dict(username='dellsberg', + password='pentagonpapers', + password_again='pentagonpapers', + is_admin=False)) - self.assertIn('password is too long', res.data) + self.assertRedirects(resp, url_for('admin_new_user_two_factor', + uid=max_journalist_pk+1)) - def test_admin_authorization_for_gets(self): + def test_admin_add_user_without_username(self): + self._login_admin() + resp = self.client.post(url_for('admin_add_user'), + data=dict(username='', + password='pentagonpapers', + password_again='pentagonpapers', + is_admin=False)) + self.assertIn('Missing username', resp.data) + + def test_admin_page_restriction_http_gets(self): admin_urls = [url_for('admin_index'), url_for('admin_add_user'), url_for('admin_edit_user', user_id=self.user.id)] self._login_user() for admin_url in admin_urls: - res = self.client.get(admin_url) - self.assertStatus(res, 302) + resp = self.client.get(admin_url) + self.assertStatus(resp, 302) - def test_admin_authorization_for_posts(self): + def test_admin_page_restrction_http_posts(self): admin_urls = [url_for('admin_reset_two_factor_totp'), url_for('admin_reset_two_factor_hotp'), url_for('admin_add_user', user_id=self.user.id), @@ -361,8 +397,8 @@ def test_admin_authorization_for_posts(self): url_for('admin_delete_user', user_id=self.user.id)] self._login_user() for admin_url in admin_urls: - res = self.client.post(admin_url) - self.assertStatus(res, 302) + resp = self.client.post(admin_url) + self.assertStatus(resp, 302) def test_user_authorization_for_gets(self): urls = [url_for('index'), url_for('col', sid='1'), @@ -370,8 +406,8 @@ def test_user_authorization_for_gets(self): url_for('edit_account')] for url in urls: - res = self.client.get(url) - self.assertStatus(res, 302) + resp = self.client.get(url) + self.assertStatus(resp, 302) def test_user_authorization_for_posts(self): urls = [url_for('add_star', sid='1'), url_for('remove_star', sid='1'), @@ -381,265 +417,171 @@ def test_user_authorization_for_posts(self): url_for('account_reset_two_factor_totp'), url_for('account_reset_two_factor_hotp')] for url in urls: - res = self.client.post(url) - self.assertStatus(res, 302) - - def test_invalid_user_password_change(self): - self._login_user() - res = self.client.post(url_for('edit_account'), data=dict( - password='not', - password_again='thesame')) - self.assertRedirects(res, url_for('edit_account')) - - def test_too_long_user_password_change(self): - self._login_user() - overly_long_password = 'a' * (Journalist.MAX_PASSWORD_LEN + 1) - - res = self.client.post(url_for('edit_account'), data=dict( - password=overly_long_password, - password_again=overly_long_password), - follow_redirects=True) - - self.assertIn('Your password is too long', res.data) - - def test_valid_user_password_change(self): - self._login_user() - res = self.client.post(url_for('edit_account'), data=dict( - password='valid', - password_again='valid')) - self.assertIn("Password successfully changed", res.data) - - def test_regenerate_totp(self): - self._login_user() - oldTotp = self.user.totp - - res = self.client.post(url_for('account_reset_two_factor_totp')) - newTotp = self.user.totp - - # check that totp is different - self.assertNotEqual(oldTotp, newTotp) - - # should redirect to verification page - self.assertRedirects(res, url_for('account_new_two_factor')) - - def test_edit_hotp(self): - self._login_user() - oldHotp = self.user.hotp - - res = self.client.post( - url_for('account_reset_two_factor_hotp'), - data=dict(otp_secret=123456) - ) - newHotp = self.user.hotp - - # check that hotp is different - self.assertNotEqual(oldHotp, newHotp) - - # should redirect to verification page - self.assertRedirects(res, url_for('account_new_two_factor')) - - def test_delete_source_deletes_submissions(self): - """Verify that when a source is deleted, the submissions that - correspond to them are also deleted.""" - - source, files = self.add_source_and_submissions() - - journalist.delete_collection(source.filesystem_id) - - # Source should be gone - results = db_session.query(Source).filter(Source.id == source.id).all() - self.assertEqual(results, []) - - # Submissions should be gone - results = db_session.query(Submission.source_id == source.id).all() + resp = self.client.post(url) + self.assertStatus(resp, 302) + + def _delete_collection_setup(self): + self.source, _ = TestSource.init_source() + TestSubmission.submit(self.source, 2) + TestReply.reply(self.user, self.source, 2) + + def test_delete_collection_updates_db(self): + """Verify that when a source is deleted, their Source identity + record, as well as Reply & Submission records associated with + that record are purged from the database.""" + self._delete_collection_setup() + journalist.delete_collection(self.source.filesystem_id) + results = Source.query.filter(Source.id == self.source.id).all() self.assertEqual(results, []) - - def test_delete_source_deletes_replies(self): - """Verify that when a source is deleted, the replies that - correspond to them are also deleted.""" - - source, files = self.add_source_and_replies() - - journalist.delete_collection(source.filesystem_id) - - # Source should be gone - results = db_session.query(Source).filter(Source.id == source.id).all() + results = db_session.query(Submission.source_id == self.source.id).all() self.assertEqual(results, []) - - # Replies should be gone - results = db_session.query(Reply.source_id == source.id).all() + results = db_session.query(Reply.source_id == self.source.id).all() self.assertEqual(results, []) def test_delete_source_deletes_source_key(self): """Verify that when a source is deleted, the PGP key that corresponds to them is also deleted.""" - - source, files = self.add_source_and_submissions() - + self._delete_collection_setup() # Source key exists - source_key = crypto_util.getkey(source.filesystem_id) + source_key = crypto_util.getkey(self.source.filesystem_id) self.assertNotEqual(source_key, None) - journalist.delete_collection(source.filesystem_id) + journalist.delete_collection(self.source.filesystem_id) # Source key no longer exists - source_key = crypto_util.getkey(source.filesystem_id) + source_key = crypto_util.getkey(self.source.filesystem_id) self.assertEqual(source_key, None) def test_delete_source_deletes_docs_on_disk(self): """Verify that when a source is deleted, the encrypted documents that exist on disk is also deleted.""" - - source, files = self.add_source_and_submissions() - + self._delete_collection_setup() # Encrypted documents exists - dir_source_docs = os.path.join(config.STORE_DIR, source.filesystem_id) - + dir_source_docs = os.path.join(config.STORE_DIR, self.source.filesystem_id) self.assertTrue(os.path.exists(dir_source_docs)) - job = journalist.delete_collection(source.filesystem_id) + job = journalist.delete_collection(self.source.filesystem_id) - # Block for up to 5s to await asynchronous Redis job result - timeout = datetime.datetime.now() + datetime.timedelta(0,5) - while 1: - if job.result == "success": - break - elif datetime.datetime.now() > timeout: - self.assertTrue(False) + # Wait up to 5s to wait for Redis worker `srm` operation to complete + Async.wait_for_redis_worker(job) # Encrypted documents no longer exist - dir_source_docs = os.path.join(config.STORE_DIR, source.filesystem_id) self.assertFalse(os.path.exists(dir_source_docs)) - def test_download_selected_from_source(self): - sid = 'EQZGCJBRGISGOTC2NZVWG6LILJBHEV3CINNEWSCLLFTUWZJPKJFECLS2NZ4G4U3QOZCFKTTPNZMVIWDCJBBHMUDBGFHXCQ3R' - source = Source(sid, crypto_util.display_id()) - db_session.add(source) - db_session.commit() - files = ['1-abc1-msg.gpg', '2-abc2-msg.gpg', '3-abc3-msg.gpg', '4-abc4-msg.gpg'] - selected_files = files[:2] - unselected_files = files[2:] - common.setup_test_docs(sid, files) + def test_download_selected_submissions_from_source(self): + source, _ = TestSource.init_source() + submissions = set(TestSubmission.submit(source, 4)) - self._login_user() - rv = self.client.post('/bulk', - data=dict(action='download', sid=sid, - doc_names_selected=selected_files)) - - self.assertEqual(rv.status_code, 200) - self.assertEqual(rv.content_type, 'application/zip') - self.assertTrue(zipfile.is_zipfile(StringIO(rv.data))) - - for file in selected_files: - self.assertTrue( - zipfile.ZipFile(StringIO(rv.data)).getinfo( - os.path.join(source.journalist_filename, file)) - ) + selected_submissions = random.sample(submissions, 2) + selected_fnames = [submission.filename + for submission in selected_submissions] - for file in unselected_files: + self._login_user() + resp = self.client.post( + '/bulk', data=dict(action='download', + sid=source.filesystem_id, + doc_names_selected=selected_fnames)) + # The download request was succesful, and the app returned a zipfile + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.content_type, 'application/zip') + self.assertTrue(zipfile.is_zipfile(StringIO(resp.data))) + # The submissions selected are in the zipfile + for filename in selected_fnames: + self.assertTrue(zipfile.ZipFile(StringIO(resp.data)).getinfo( + os.path.join(source.journalist_filename, filename))) + # The submissions not selected are absent from the zipfile + not_selected_submissions = submissions.difference(selected_submissions) + not_selected_fnames = [submission.filename + for submission in not_selected_submissions] + for filename in not_selected_fnames: try: - zipfile.ZipFile(StringIO(rv.data)).getinfo( - os.path.join(source.journalist_filename, file)) + zipfile.ZipFile(StringIO(resp.data)).getinfo( + os.path.join(source.journalist_filename, filename)) except KeyError: pass else: self.assertTrue(False) - def _setup_two_sources(self): - # Add two sources to the database - self.sid = crypto_util.hash_codename(crypto_util.genrandomid()) - self.sid2 = crypto_util.hash_codename(crypto_util.genrandomid()) - self.source = Source(self.sid, crypto_util.display_id()) - self.source2 = Source(self.sid2, crypto_util.display_id()) - db_session.add(self.source) - db_session.add(self.source2) - db_session.commit() - - # The sources have made two submissions each, both being messages - self.files = ['1-s1-msg.gpg', '2-s1-msg.gpg'] - common.setup_test_docs(self.sid, self.files) - self.files2 = ['1-s2-msg.gpg', '2-s2-msg.gpg'] - common.setup_test_docs(self.sid2, self.files2) - - # Mark the first message for each of the two sources read - for fn in (self.files[0], self.files2[0]): - s = Submission.query.filter(Submission.filename == fn).one() - s.downloaded = True - db_session.commit() - - - - def test_download_unread_selected_sources(self): - self._setup_two_sources() + def _bulk_download_setup(self): + """Create a couple sources, make some submissions on their behalf, + mark some of them as downloaded, and then perform *action* on all + sources.""" + self.source0, _ = TestSource.init_source() + self.source1, _ = TestSource.init_source() + self.submissions0 = set(TestSubmission.submit(self.source0, 2)) + self.submissions1 = set(TestSubmission.submit(self.source1, 3)) + self.downloaded0 = random.sample(self.submissions0, 1) + TestSubmission.mark_downloaded(*self.downloaded0) + self.not_downloaded0 = self.submissions0.difference(self.downloaded0) + self.downloaded1 = random.sample(self.submissions1, 2) + TestSubmission.mark_downloaded(*self.downloaded1) + self.not_downloaded1 = self.submissions1.difference(self.downloaded1) + + + def test_download_unread_all_sources(self): + self._bulk_download_setup() self._login_user() - resp = self.client.post('/col/process', - data=dict(action='download-unread', - cols_selected=[self.sid, self.sid2])) - - self.assertEqual(resp.status_code, 200) - self.assertEqual(resp.content_type, 'application/zip') - self.assertTrue(zipfile.is_zipfile(StringIO(resp.data))) - - for file in (self.files[1], self.files2[1]): + # Download all unread messages from all sources + self.resp = self.client.post( + '/col/process', + data=dict(action='download-unread', + cols_selected=[self.source0.filesystem_id, + self.source1.filesystem_id])) + + # The download request was succesful, and the app returned a zipfile + self.assertEqual(self.resp.status_code, 200) + self.assertEqual(self.resp.content_type, 'application/zip') + self.assertTrue(zipfile.is_zipfile(StringIO(self.resp.data))) + # All the not dowloaded submissions are in the zipfile + for submission in self.not_downloaded0.union(self.not_downloaded1): self.assertTrue( - zipfile.ZipFile(StringIO(resp.data)).getinfo( - os.path.join('unread', file)) + zipfile.ZipFile(StringIO(self.resp.data)).getinfo( + os.path.join('unread', submission.filename)) ) - - for file in (self.files[0], self.files2[0]): + # All the downloaded submissions are absent from the zipfile + for submission in self.downloaded0 + self.downloaded1: try: - zipfile.ZipFile(StringIO(resp.data)).getinfo( - os.path.join('unread', file)) + zipfile.ZipFile(StringIO(self.resp.data)).getinfo( + os.path.join('unread', submission.filename)) except KeyError: pass else: self.assertTrue(False) def test_download_all_selected_sources(self): - self._setup_two_sources() + self._bulk_download_setup() self._login_user() - resp = self.client.post('/col/process', - data=dict(action='download-all', - cols_selected=[self.sid2])) - - self.assertEqual(resp.status_code, 200) - self.assertEqual(resp.content_type, 'application/zip') - self.assertTrue(zipfile.is_zipfile(StringIO(resp.data))) - - for file in self.files2: + # Dowload all messages from self.source1 + self.resp = self.client.post( + '/col/process', + data=dict(action='download-all', + cols_selected=[self.source1.filesystem_id])) + + # The download request was succesful, and the app returned a zipfile + self.assertEqual(self.resp.status_code, 200) + self.assertEqual(self.resp.content_type, 'application/zip') + self.assertTrue(zipfile.is_zipfile(StringIO(self.resp.data))) + # All messages from self.source1 are in the zipfile + for submission in self.submissions1: self.assertTrue( - zipfile.ZipFile(StringIO(resp.data)).getinfo( - os.path.join('all', file)) + zipfile.ZipFile(StringIO(self.resp.data)).getinfo( + os.path.join('all', submission.filename)) ) - - for file in self.files: + # All messages from self.source2 are absent from the zipfile + for submission in self.submissions0: try: - zipfile.ZipFile(StringIO(resp.data)).getinfo( - os.path.join('all', file)) + zipfile.ZipFile(StringIO(self.resp.data)).getinfo( + os.path.join('all', submission.filename)) except KeyError: pass else: self.assertTrue(False) - def test_max_password_length(self): - """Creating a Journalist with a password that is greater than the - maximum password length should raise an exception""" - overly_long_password = 'a'*(Journalist.MAX_PASSWORD_LEN + 1) - with self.assertRaises(InvalidPasswordLength): - temp_journalist = Journalist( - username="My Password is Too Big!", - password=overly_long_password) - - def test_add_star(self): + def test_add_star_redirects_to_index(self): + source, _ = TestSource.init_source() self._login_user() - - sid = 'EQZGCJBRGISGOTC2NZVWG6LILJBHEV3CINNEWSCLLFTUWZJPKJFECLS2NZ4G4U3QOZCFKTTPNZMVIWDCJBBHMUDBGFHXCQ3R' - source = Source(sid, crypto_util.display_id()) - db_session.add(source) - db_session.commit() - - res = self.client.post(url_for('add_star', sid=sid)) - self.assertRedirects(res, url_for('index')) + resp = self.client.post(url_for('add_star', sid=source.filesystem_id)) + self.assertRedirects(resp, url_for('index')) if __name__ == "__main__": diff --git a/securedrop/tests/test_unit_source.py b/securedrop/tests/test_unit_source.py index 77eabfaa6a..b50593a7d3 100644 --- a/securedrop/tests/test_unit_source.py +++ b/securedrop/tests/test_unit_source.py @@ -1,30 +1,32 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import common +from bs4 import BeautifulSoup +from cStringIO import StringIO +from flask import session, escape +from flask_testing import TestCase +from mock import patch, ANY import os import re import unittest -from cStringIO import StringIO -from bs4 import BeautifulSoup -from flask_testing import TestCase -from flask import session, escape -from mock import patch, ANY -import source -from db import Source +# Set environment variable so config.py uses a test environment os.environ['SECUREDROP_ENV'] = 'test' +from common import SetUp, TearDown, TestSource +from db import Source +import source + -class TestSource(TestCase): +class TestSourceApp(TestCase): def create_app(self): return source.app def setUp(self): - common.shared_setup() + SetUp.setup() def tearDown(self): - common.shared_teardown() + TearDown.teardown() def test_index(self): """Test that the landing page loads and looks how we expect""" @@ -44,14 +46,14 @@ def _find_codename(self, html): def test_generate(self): with self.client as c: - rv = c.get('/generate') - self.assertEqual(rv.status_code, 200) + resp = c.get('/generate') + self.assertEqual(resp.status_code, 200) session_codename = session['codename'] - self.assertIn("Remember this codename and keep it secret", rv.data) + self.assertIn("Remember this codename and keep it secret", resp.data) self.assertIn( "To protect your identity, we're assigning you a unique codename.", - rv.data) - codename = self._find_codename(rv.data) + resp.data) + codename = self._find_codename(resp.data) # default codename length is 7 words self.assertEqual(len(codename.split()), 7) # codename is also stored in the session - make sure it matches the @@ -80,90 +82,90 @@ def test_generate_has_login_link(self): """The generate page should have a link to remind people to login if they already have a codename, rather than create a new one. """ - rv = self.client.get('/generate') - self.assertIn("Already have a codename?", rv.data) - soup = BeautifulSoup(rv.data) + resp = self.client.get('/generate') + self.assertIn("Already have a codename?", resp.data) + soup = BeautifulSoup(resp.data) already_have_codename_link = soup.select('a#already-have-codename')[0] self.assertEqual(already_have_codename_link['href'], '/login') def test_generate_already_logged_in(self): self._new_codename() # Make sure it redirects to /lookup when logged in - rv = self.client.get('/generate') - self.assertEqual(rv.status_code, 302) + resp = self.client.get('/generate') + self.assertEqual(resp.status_code, 302) # Make sure it flashes the message on the lookup page - rv = self.client.get('/generate', follow_redirects=True) + resp = self.client.get('/generate', follow_redirects=True) # Should redirect to /lookup - self.assertEqual(rv.status_code, 200) - self.assertIn("because you are already logged in.", rv.data) + self.assertEqual(resp.status_code, 200) + self.assertIn("because you are already logged in.", resp.data) def test_create(self): with self.client as c: - rv = c.get('/generate') + resp = c.get('/generate') codename = session['codename'] - rv = c.post('/create', follow_redirects=True) + resp = c.post('/create', follow_redirects=True) self.assertTrue(session['logged_in']) # should be redirected to /lookup - self.assertIn("Submit documents and messages", rv.data) + self.assertIn("Submit documents and messages", resp.data) def _new_codename(self): - return common.new_codename(self.client, session) + return TestSource.new_codename(self.client, session) def test_lookup(self): - """Test various elements on the /lookup page""" + """Test various elements on the /lookup page.""" codename = self._new_codename() - rv = self.client.post('login', data=dict(codename=codename), - follow_redirects=True) + resp = self.client.post('login', data=dict(codename=codename), + follow_redirects=True) # redirects to /lookup - self.assertIn("public key", rv.data) + self.assertIn("public key", resp.data) # download the public key - rv = self.client.get('journalist-key') - self.assertIn("BEGIN PGP PUBLIC KEY BLOCK", rv.data) + resp = self.client.get('journalist-key') + self.assertIn("BEGIN PGP PUBLIC KEY BLOCK", resp.data) def test_login_and_logout(self): - rv = self.client.get('/login') - self.assertEqual(rv.status_code, 200) - self.assertIn("Login to check for responses", rv.data) + resp = self.client.get('/login') + self.assertEqual(resp.status_code, 200) + self.assertIn("Login to check for responses", resp.data) codename = self._new_codename() with self.client as c: - rv = c.post('/login', data=dict(codename=codename), + resp = c.post('/login', data=dict(codename=codename), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - self.assertIn("Submit documents and messages", rv.data) + self.assertEqual(resp.status_code, 200) + self.assertIn("Submit documents and messages", resp.data) self.assertTrue(session['logged_in']) - common.logout(c) + resp = c.get('/logout', follow_redirects=True) with self.client as c: - rv = self.client.post('/login', data=dict(codename='invalid'), + resp = self.client.post('/login', data=dict(codename='invalid'), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - self.assertIn('Sorry, that is not a recognized codename.', rv.data) + self.assertEqual(resp.status_code, 200) + self.assertIn('Sorry, that is not a recognized codename.', resp.data) self.assertNotIn('logged_in', session) with self.client as c: - rv = c.post('/login', data=dict(codename=codename), + resp = c.post('/login', data=dict(codename=codename), follow_redirects=True) - self.assertEqual(rv.status_code, 200) + self.assertEqual(resp.status_code, 200) self.assertTrue(session['logged_in']) - rv = c.get('/logout', follow_redirects=True) + resp = c.get('/logout', follow_redirects=True) self.assertTrue(not session) - self.assertIn('Thank you for logging out.', rv.data) + self.assertIn('Thank you for logging out.', resp.data) def test_login_with_whitespace(self): """Test that codenames with leading or trailing whitespace still work""" def login_test(codename): - rv = self.client.get('/login') - self.assertEqual(rv.status_code, 200) - self.assertIn("Login to check for responses", rv.data) + resp = self.client.get('/login') + self.assertEqual(resp.status_code, 200) + self.assertIn("Login to check for responses", resp.data) with self.client as c: - rv = c.post('/login', data=dict(codename=codename), + resp = c.post('/login', data=dict(codename=codename), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - self.assertIn("Submit documents and messages", rv.data) + self.assertEqual(resp.status_code, 200) + self.assertIn("Submit documents and messages", resp.data) self.assertTrue(session['logged_in']) - common.logout(c) + resp = c.get('/logout', follow_redirects=True) codename = self._new_codename() login_test(codename + ' ') @@ -188,29 +190,29 @@ def test_initial_submission_notification(self): reminding sources to check back later for replies. """ self._new_codename() - rv = self._dummy_submission() - self.assertEqual(rv.status_code, 200) + resp = self._dummy_submission() + self.assertEqual(resp.status_code, 200) self.assertIn( "Thanks for submitting something to SecureDrop! Please check back later for replies.", - rv.data) + resp.data) def test_submit_message(self): self._new_codename() self._dummy_submission() - rv = self.client.post('/submit', data=dict( + resp = self.client.post('/submit', data=dict( msg="This is a test.", fh=(StringIO(''), ''), ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - self.assertIn("Thanks! We received your message.", rv.data) + self.assertEqual(resp.status_code, 200) + self.assertIn("Thanks! We received your message.", resp.data) def test_submit_empty_message(self): self._new_codename() - rv = self.client.post('/submit', data=dict( + resp = self.client.post('/submit', data=dict( msg="", fh=(StringIO(''), ''), ), follow_redirects=True) - self.assertIn("You must enter a message or choose a file to submit.", rv.data) + self.assertIn("You must enter a message or choose a file to submit.", resp.data) def test_submit_big_message(self): ''' @@ -220,43 +222,43 @@ def test_submit_big_message(self): ''' self._new_codename() self._dummy_submission() - rv = self.client.post('/submit', data=dict( + resp = self.client.post('/submit', data=dict( msg="AA" * (1024 * 512), fh=(StringIO(''), ''), ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - self.assertIn("Thanks! We received your message.", rv.data) + self.assertEqual(resp.status_code, 200) + self.assertIn("Thanks! We received your message.", resp.data) def test_submit_file(self): self._new_codename() self._dummy_submission() - rv = self.client.post('/submit', data=dict( + resp = self.client.post('/submit', data=dict( msg="", fh=(StringIO('This is a test'), 'test.txt'), ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) + self.assertEqual(resp.status_code, 200) self.assertIn( escape( '{} "{}"'.format( "Thanks! We received your document", "test.txt")), - rv.data) + resp.data) def test_submit_both(self): self._new_codename() self._dummy_submission() - rv = self.client.post('/submit', data=dict( + resp = self.client.post('/submit', data=dict( msg="This is a test", fh=(StringIO('This is a test'), 'test.txt'), ), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - self.assertIn("Thanks! We received your message.", rv.data) + self.assertEqual(resp.status_code, 200) + self.assertIn("Thanks! We received your message.", resp.data) self.assertIn( escape( '{} "{}"'.format( "Thanks! We received your document", 'test.txt')), - rv.data) + resp.data) @patch('gzip.GzipFile') def test_submit_sanitizes_filename(self, gzipfile): @@ -274,24 +276,24 @@ def test_submit_sanitizes_filename(self, gzipfile): fileobj=ANY) def test_tor2web_warning_headers(self): - rv = self.client.get('/', headers=[('X-tor2web', 'encrypted')]) - self.assertEqual(rv.status_code, 200) - self.assertIn("You appear to be using Tor2Web.", rv.data) + resp = self.client.get('/', headers=[('X-tor2web', 'encrypted')]) + self.assertEqual(resp.status_code, 200) + self.assertIn("You appear to be using Tor2Web.", resp.data) def test_tor2web_warning(self): - rv = self.client.get('/tor2web-warning') - self.assertEqual(rv.status_code, 200) - self.assertIn("Why is there a warning about Tor2Web?", rv.data) + resp = self.client.get('/tor2web-warning') + self.assertEqual(resp.status_code, 200) + self.assertIn("Why is there a warning about Tor2Web?", resp.data) def test_why_journalist_key(self): - rv = self.client.get('/why-journalist-key') - self.assertEqual(rv.status_code, 200) - self.assertIn("Why download the journalist's public key?", rv.data) + resp = self.client.get('/why-journalist-key') + self.assertEqual(resp.status_code, 200) + self.assertIn("Why download the journalist's public key?", resp.data) def test_howto_disable_js(self): - rv = self.client.get('/howto-disable-js') - self.assertEqual(rv.status_code, 200) - self.assertIn("Disable JavaScript to Protect Your Anonymity", rv.data) + resp = self.client.get('/howto-disable-js') + self.assertEqual(resp.status_code, 200) + self.assertIn("Disable JavaScript to Protect Your Anonymity", resp.data) @patch('crypto_util.hash_codename') def test_login_with_overly_long_codename(self, mock_hash_codename): @@ -299,12 +301,12 @@ def test_login_with_overly_long_codename(self, mock_hash_codename): an error, and scrypt should not be called to avoid DoS.""" overly_long_codename = 'a' * (Source.MAX_CODENAME_LEN + 1) with self.client as client: - rv = client.post( + resp = client.post( '/login', data=dict(codename=overly_long_codename), follow_redirects=True) - self.assertEqual(rv.status_code, 200) - self.assertIn("Sorry, that is not a recognized codename.", rv.data) + self.assertEqual(resp.status_code, 200) + self.assertIn("Sorry, that is not a recognized codename.", resp.data) self.assertFalse(mock_hash_codename.called, "Called hash_codename for codename w/ invalid length") diff --git a/securedrop/tests/test_unit_store.py b/securedrop/tests/test_unit_store.py index fc8f66cc3f..8b35b8483a 100644 --- a/securedrop/tests/test_unit_store.py +++ b/securedrop/tests/test_unit_store.py @@ -1,15 +1,18 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- + import os import unittest import zipfile + # Set environment variable so config.py uses a test environment os.environ['SECUREDROP_ENV'] = 'test' + +import crypto_util +from common import SetUp, TearDown, TestSource, TestSubmission import config -import store -import common from db import db_session, Source -import crypto_util +import store class TestStore(unittest.TestCase): @@ -17,10 +20,11 @@ class TestStore(unittest.TestCase): """The set of tests for store.py.""" def setUp(self): - common.shared_setup() + SetUp.setup() def tearDown(self): - common.shared_teardown() + TearDown.teardown() + db_session.remove() def test_verify(self): with self.assertRaises(store.PathException): @@ -29,16 +33,14 @@ def test_verify(self): store.verify(config.STORE_DIR + "_backup") def test_get_zip(self): - sid = 'EQZGCJBRGISGOTC2NZVWG6LILJBHEV3CINNEWSCLLFTUWZJPKJFECLS2NZ4G4U3QOZCFKTTPNZMVIWDCJBBHMUDBGFHXCQ3R' - source = Source(sid, crypto_util.display_id()) - db_session.add(source) - db_session.commit() - - files = ['1-abc1-msg.gpg', '2-abc2-msg.gpg'] - filenames = common.setup_test_docs(sid, files) + source, _ = TestSource.init_source() + submissions = TestSubmission.submit(source, 2) + filenames = [os.path.join(config.STORE_DIR, + source.filesystem_id, + submission.filename) + for submission in submissions] archive = zipfile.ZipFile(store.get_bulk_archive(filenames)) - archivefile_contents = archive.namelist() for archived_file, actual_file in zip(archivefile_contents, filenames): @@ -47,19 +49,14 @@ def test_get_zip(self): self.assertEquals(zipped_file_content, actual_file_content) def test_rename_valid_submission(self): - sid = 'EQZGCJBRGISGOTC2NZVWG6LILJBHEV3CINNEWSCLLFTUWZJPKJFECLS2NZ4G4U3QOZCFKTTPNZMVIWDCJBBHMUDBGFHXCQ3R' - - source_dir = os.path.join(config.STORE_DIR, sid) - if not os.path.exists(source_dir): - os.makedirs(source_dir) - - old_filename = '1-abc1-msg.gpg' - open(os.path.join(source_dir, old_filename), 'w').close() - - new_filestem = 'abc2' - expected_filename = '1-abc2-msg.gpg' - actual_filename = store.rename_submission(sid, old_filename, - new_filestem) + source, _ = TestSource.init_source() + old_journalist_filename = source.journalist_filename + old_filename = TestSubmission.submit(source, 1)[0].filename + new_journalist_filename = 'nestor_makhno' + expected_filename = old_filename.replace(old_journalist_filename, + new_journalist_filename) + actual_filename = store.rename_submission(source.filesystem_id, old_filename, + new_journalist_filename) self.assertEquals(actual_filename, expected_filename) if __name__ == "__main__":