diff --git a/securedrop/tests/common.py b/securedrop/tests/common.py
index 5d78211da8f..74d44cf8455 100644
--- a/securedrop/tests/common.py
+++ b/securedrop/tests/common.py
@@ -1,120 +1,291 @@
+# -*- coding: utf-8 -*-
+
+from functools import wraps
+import gnupg
+import mock
import os
import shutil
-import uuid
import subprocess
-
-import gnupg
+import time
# Set environment variable so config.py uses a test environment
os.environ['SECUREDROP_ENV'] = 'test'
+
import config
-from db import init_db, db_session, Journalist, Reply, Source, Submission
+from db import db_session, init_db, Journalist, Reply, Source, Submission
import crypto_util
+import store
-# TODO: the PID file for the redis worker is hard-coded below.
-# Ideally this constant would be provided by a test harness.
-# It has been intentionally omitted from `config.py.example`
-# in order to isolate the test vars from prod vars.
-# When refactoring the test suite, the TEST_WORKER_PIDFILE
-# TEST_WORKER_PIDFILE is also hard-coded in `manage.py`.
-TEST_WORKER_PIDFILE = "/tmp/securedrop_test_worker.pid"
+class TestJournalist:
+ """Test fixtures for working with :class:`db.Journalist`.
+ """
+ @staticmethod
+ def init_journalist(is_admin=False):
+ """Initialize a journalist into the database. Return their
+ :class:`db.Journalist` object and password string.
-def clean_root():
- shutil.rmtree(config.SECUREDROP_DATA_ROOT)
+ :param bool is_admin: Whether the user is an admin.
+ :returns: A 2-tuple. The first entry, the :class:`db.Journalist`
+ initialized. The second, their password string.
+ """
+ username = crypto_util.genrandomid()
+ user_pw = crypto_util.genrandomid()
+ user = Journalist(username, user_pw, is_admin)
+ db_session.add(user)
+ db_session.commit()
+ return user, user_pw
-def create_directories():
- # Create directories for the file store and the GPG keyring
- for d in (config.SECUREDROP_DATA_ROOT, config.STORE_DIR,
- config.GPG_KEY_DIR, config.TEMP_DIR):
- if not os.path.isdir(d):
- os.mkdir(d)
+ @staticmethod
+ def init_admin():
+ return TestJournalist.init_journalist(True)
+ @staticmethod
+ def mock_verify_token(self):
+ patcher = mock.patch('db.Journalist.verify_token')
+ self.addCleanup(patcher.stop)
+ self.mock_journalist_verify_token = patcher.start()
+ self.mock_journalist_verify_token.return_value = True
-def init_gpg():
- # Initialize the GPG keyring
- gpg = gnupg.GPG(homedir=config.GPG_KEY_DIR)
- # Import the journalist key for testing (faster to import a pre-generated
- # key than to gen a new one every time)
- for keyfile in ("test_journalist_key.pub", "test_journalist_key.sec"):
- gpg.import_keys(open(keyfile).read())
- return gpg
+class TestSource:
+ """Test fixtures for working with :class:`db.Source`.
+ """
+ @staticmethod
+ def init_source():
+ """Initialize a source: create their database record, the
+ filesystem directory that stores their submissions & replies,
+ and their GPG key encrypted with their codename.
-def create_file(filename):
- dirname = os.path.dirname(filename)
- if not os.path.exists(dirname):
- os.makedirs(dirname)
- with open(filename, 'w') as fp:
- fp.write(str(uuid.uuid4()))
+ :returns: A 2-tuple. The first entry, the :class:`db.Source`
+ initialized. The second, their codename string.
+ """
+ # Create source identity and database record
+ codename = crypto_util.genrandomid()
+ filesystem_id = crypto_util.hash_codename(codename)
+ journalist_filename = crypto_util.display_id()
+ source = Source(filesystem_id, journalist_filename)
+ db_session.add(source)
+ db_session.commit()
+ # Create the directory to store their submissions and replies
+ os.mkdir(store.path(source.filesystem_id))
+ # Generate their key, blocking for as long as necessary
+ crypto_util.genkeypair(source.filesystem_id, codename)
+ return source, codename
-def setup_test_docs(sid, files):
- filenames = [os.path.join(config.STORE_DIR, sid, file) for file in files]
+ # NOTE: this method is potentially dangerous to rely on for now due
+ # to the fact flask_testing.TestCase only uses on request context
+ # per method (see
+ # https://github.com/freedomofpress/securedrop/issues/1444).
+ @staticmethod
+ def new_codename(client, session):
+ """Helper function to go through the "generate codename" flow.
+ """
+ with client as c:
+ c.get('/generate')
+ codename = session['codename']
+ c.post('/create')
+ return codename
- for filename in filenames:
- create_file(filename)
- # Add Submission to the db
- source = Source.query.filter(Source.filesystem_id == sid).one()
- submission = Submission(source, os.path.basename(filename))
- db_session.add(submission)
+class TestSubmission:
+ """Test fixtures for working with :class:`db.Submission`.
+ """
+ @staticmethod
+ def submit(source, num_submissions):
+ """Generates and submits *num_submissions*
+ :class:`db.Submission`s on behalf of a :class:`db.Source`
+ *source*.
+
+ :param db.Source source: The source on who's behalf to make
+ submissions.
+
+ :param int num_submissions: Number of random-data submissions
+ to make.
+
+ :returns: A list of the :class:`db.Submission`s submitted.
+ """
+ assert num_submissions >= 1
+ submissions = []
+ for _ in range(num_submissions):
+ source.interaction_count += 1
+ fpath = store.save_message_submission(source.filesystem_id,
+ source.interaction_count,
+ source.journalist_filename,
+ str(os.urandom(1)))
+ submission = Submission(source, fpath)
+ submissions.append(submission)
+ db_session.add(submission)
+
+ db_session.commit()
+ return submissions
+
+ @staticmethod
+ def mark_downloaded(*submissions):
+ for submission in submissions:
+ submission.downloaded = True
db_session.commit()
- return filenames
+class TestReply:
+ """Test fixtures for working with :class:`db.Reply`.
+ """
+ @staticmethod
+ def reply(journalist, source, num_replies):
+ """Generates and submits *num_replies* replies to *source*
+ :class:`db.Source`.
-def setup_test_replies(sid, journo_id, files):
- filenames = [os.path.join(config.STORE_DIR, sid, file) for file in files]
+ :param db.Journalist journalist: The journalist to write the
+ reply from.
- for filename in filenames:
- create_file(filename)
+ :param db.Source source: The source to send the reply to.
+
+ :param int num_replies: Number of random-data replies to make.
+
+ :returns: A list of the :class:`db.Reply`s submitted.
+ """
+ assert num_replies >= 1
+ replies = []
+ for _ in range(num_replies):
+ source.interaction_count += 1
+ fname = "{}-{}-reply.gpg".format(source.interaction_count,
+ source.journalist_filename)
+ crypto_util.encrypt(str(os.urandom(1)),
+ [
+ crypto_util.getkey(source.filesystem_id),
+ config.JOURNALIST_KEY
+ ],
+ store.path(source.filesystem_id, fname))
+ reply = Reply(journalist, source, fname)
+ replies.append(reply)
+ db_session.add(reply)
- # Add Reply to the db
- source = Source.query.filter(Source.filesystem_id == sid).one()
- journalist = Journalist.query.filter(Journalist.id == journo_id).one()
- reply = Reply(journalist, source, os.path.basename(filename))
- db_session.add(reply)
db_session.commit()
+ return replies
+
+
+class SetUp:
+ """Test fixtures for initializing a generic test environment.
+ """
+ # TODO: the PID file for the redis worker is hard-coded below.
+ # Ideally this constant would be provided by a test harness.
+ # It has been intentionally omitted from `config.py.example`
+ # in order to isolate the test vars from prod vars.
+ # When refactoring the test suite, the test_worker_pidfile
+ # test_worker_pidfile is also hard-coded in `manage.py`.
+ test_worker_pidfile = "/tmp/securedrop_test_worker.pid"
+
+ @staticmethod
+ def create_directories():
+ # Create directories for the file store and the GPG keyring
+ for d in (config.SECUREDROP_DATA_ROOT, config.STORE_DIR,
+ config.GPG_KEY_DIR, config.TEMP_DIR):
+ if not os.path.isdir(d):
+ os.mkdir(d)
+
+ @staticmethod
+ def init_gpg():
+ # Initialize the GPG keyring
+ gpg = gnupg.GPG(homedir=config.GPG_KEY_DIR)
+ # Import the journalist key for testing (faster to import a pre-generated
+ # key than to gen a new one every time)
+ for keyfile in ("test_journalist_key.pub", "test_journalist_key.sec"):
+ gpg.import_keys(open(keyfile).read())
+ return gpg
+
+ @classmethod
+ def setup(cls):
+ """Set up the file system, GPG, and database."""
+ SetUp.create_directories()
+ SetUp.init_gpg()
+ init_db()
+ # Do tests that should always run on app startup
+ crypto_util.do_runtime_tests()
+ # Start the Python-RQ worker if it's not already running
+ if not os.path.exists(cls.test_worker_pidfile):
+ subprocess.Popen(["rqworker",
+ "-P", config.SECUREDROP_ROOT,
+ "--pid", cls.test_worker_pidfile])
+
+
+class TearDown:
+ """Test fixtures for tearing down a generic test environment.
+ """
+ @staticmethod
+ def teardown():
+ shutil.rmtree(config.SECUREDROP_DATA_ROOT)
+
+ # TODO: now that SD has a logout button, we can deprecate use of
+ # this function.
+ @staticmethod
+ def logout(test_client):
+ with test_client.session_transaction() as sess:
+ sess.clear()
+
+
+class Async:
+ """Test fixtures for use with asynchronous processes.
+ """
+ redis_success_return_value = 'success'
- return filenames
+ @classmethod
+ def wait_for_redis_worker(cls, job, timeout=5):
+ """Raise an error if the Redis job doesn't complete successfully
+ before a timeout.
+ :param rq.job.Job job: A Redis job to wait for.
-def new_codename(client, session):
- """Helper function to go through the "generate codename" flow"""
- with client as c:
- rv = c.get('/generate')
- codename = session['codename']
- rv = c.post('/create')
- return codename
+ :param int timeout: Seconds to wait for the job to finish.
+ :raises: An :exc:`AssertionError`.
+ """
+ start_time = time.time()
+ while time.time() - start_time < timeout:
+ if job.result == cls.redis_success_return_value:
+ return
+ elif job.result not in (None, cls.redis_success_return_value):
+ assert False, 'Redis worker failed!'
+ assert False, 'Redis worker timed out!'
-def shared_setup():
- """Set up the file system, GPG, and database"""
- create_directories()
- init_gpg()
- init_db()
+ @staticmethod
+ def wait_for_function_result(f, timeout=5):
+ """Wraps function *f*: calls *f* until *f* returns a non-None
+ value, or timeout is reached.
- # Do tests that should always run on app startup
- crypto_util.do_runtime_tests()
+ :param function f: The function to run.
- # Start the Python-RQ worker if it's not already running
- if not os.path.exists(TEST_WORKER_PIDFILE):
- subprocess.Popen(["rqworker", "-P", config.SECUREDROP_ROOT,
- "--pid", TEST_WORKER_PIDFILE])
+ :param int timeout: Seconds to wait for the function to return.
+ :raises: An :exc:`AssertionError`.
+ """
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ start_time = time.time()
+ while time.time() - start_time < timeout:
+ result = f(*args, **kwargs)
+ if result is not None:
+ return result
+ assert False, 'Asynchronous function timeout!'
+ return wrapper
-def shared_teardown():
- clean_root()
+ @staticmethod
+ def wait_for_assertion(assertion_expression, timeout=5):
+ """Calls an assertion_expression repeatedly, until the assertion
+ passes or a timeout is reached.
+ :param assertion_expression: An assertion expression. Generally
+ a call to a
+ :class:`unittest.TestCase` method.
-def logout(test_client):
- # See http://flask.pocoo.org/docs/testing/#accessing-and-modifying-sessions
- # This is necessary because SecureDrop doesn't have a logout button, so a
- # user is logged in until they close the browser, which clears the session.
- # For testing, this function simulates closing the browser at places
- # where a source is likely to do so (for instance, between submitting a
- # document and checking for a journalist reply).
- with test_client.session_transaction() as sess:
- sess.clear()
+ :param int timeout: Seconds to wait for the function to return.
+ """
+ start_time = time.time()
+ while time.time() - start_time < timeout:
+ try:
+ return assertion_expression()
+ except AssertionError:
+ pass
+ # one more try, which will raise any errors if they are outstanding
+ return assertion_expression()
diff --git a/securedrop/tests/functional/functional_test.py b/securedrop/tests/functional/functional_test.py
index e6b82b8f7c3..014b1d1679e 100644
--- a/securedrop/tests/functional/functional_test.py
+++ b/securedrop/tests/functional/functional_test.py
@@ -1,23 +1,26 @@
-import unittest
+# -*- coding: utf-8 -*-
+
+import gnupg
+import os
+from multiprocessing import Process
from selenium import webdriver
-from selenium.webdriver.firefox import firefox_binary
from selenium.common.exceptions import WebDriverException
-from multiprocessing import Process
-import socket
+from selenium.webdriver.firefox import firefox_binary
import shutil
-import os
-import gnupg
-import urllib2
+import socket
import sys
+import unittest
+import urllib2
# Set environment variable so config.py uses a test environment
os.environ['SECUREDROP_ENV'] = 'test'
-import config
+import db
+import config
import source
import journalist
-from tests import common
+from tests.common import SetUp, TearDown
import urllib2
import signal
@@ -54,9 +57,9 @@ def setUp(self):
signal.signal(signal.SIGUSR1, lambda _, s: traceback.print_stack(s))
- common.create_directories()
- self.gpg = common.init_gpg()
- common.init_db()
+ SetUp.create_directories()
+ self.gpg = SetUp.init_gpg()
+ db.init_db()
source_port = self._unused_port()
journalist_port = self._unused_port()
@@ -94,7 +97,7 @@ def start_journalist_server():
self.secret_message = 'blah blah blah'
def tearDown(self):
- common.clean_root()
+ TearDown.teardown()
self.driver.quit()
self.source_process.terminate()
self.journalist_process.terminate()
diff --git a/securedrop/tests/test_journalist.py b/securedrop/tests/test_journalist.py
index db988886bc6..948bb758593 100644
--- a/securedrop/tests/test_journalist.py
+++ b/securedrop/tests/test_journalist.py
@@ -1,12 +1,18 @@
-import unittest
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import os
from mock import patch, ANY, MagicMock
+import unittest
+# Set environment variable so config.py uses a test environment
+os.environ['SECUREDROP_ENV'] = 'test'
+
+from common import SetUp, TearDown, TestJournalist
+from db import db_session, InvalidPasswordLength, Journalist
import journalist
-import common
-from db import Journalist, InvalidPasswordLength, db_session
-class TestJournalist(unittest.TestCase):
+class TestJournalistApp(unittest.TestCase):
def setUp(self):
journalist.logged_in = MagicMock()
@@ -18,10 +24,15 @@ def setUp(self):
journalist.get_docs = MagicMock()
journalist.get_or_else = MagicMock()
+ def _set_up_request(self, cols_selected, action):
+ journalist.request.form.__contains__.return_value = True
+ journalist.request.form.getlist = MagicMock(return_value=cols_selected)
+ journalist.request.form.__getitem__.return_value = action
+
@patch("journalist.col_delete")
def test_col_process_delegates_to_col_delete(self, col_delete):
cols_selected = ['source_id']
- self.set_up_request(cols_selected, 'delete')
+ self._set_up_request(cols_selected, 'delete')
journalist.col_process()
@@ -30,7 +41,7 @@ def test_col_process_delegates_to_col_delete(self, col_delete):
@patch("journalist.col_star")
def test_col_process_delegates_to_col_star(self, col_star):
cols_selected = ['source_id']
- self.set_up_request(cols_selected, 'star')
+ self._set_up_request(cols_selected, 'star')
journalist.col_process()
@@ -39,7 +50,7 @@ def test_col_process_delegates_to_col_star(self, col_star):
@patch("journalist.col_un_star")
def test_col_process_delegates_to_col_un_star(self, col_un_star):
cols_selected = ['source_id']
- self.set_up_request(cols_selected, 'un-star')
+ self._set_up_request(cols_selected, 'un-star')
journalist.col_process()
@@ -48,7 +59,7 @@ def test_col_process_delegates_to_col_un_star(self, col_un_star):
@patch("journalist.abort")
def test_col_process_returns_404_with_bad_action(self, abort):
cols_selected = ['source_id']
- self.set_up_request(cols_selected, 'something-random')
+ self._set_up_request(cols_selected, 'something-random')
journalist.col_process()
@@ -67,10 +78,6 @@ def test_col_un_star_call_db(self, db_session):
db_session.commit.assert_called_with()
- def set_up_request(self, cols_selected, action):
- journalist.request.form.__contains__.return_value = True
- journalist.request.form.getlist = MagicMock(return_value=cols_selected)
- journalist.request.form.__getitem__.return_value = action
@classmethod
def tearDownClass(cls):
@@ -78,33 +85,19 @@ def tearDownClass(cls):
# break other tests
reload(journalist)
-
+seshy = None
class TestJournalistLogin(unittest.TestCase):
def setUp(self):
- common.shared_setup()
+ SetUp.setup()
# Patch the two-factor verification so it always succeeds
- patcher = patch('db.Journalist.verify_token')
- self.addCleanup(patcher.stop)
- self.mock_journalist_verify_token = patcher.start()
- self.mock_journalist_verify_token.return_value = True
-
- self.username = "test user"
- self.password = "test password"
- self.user = Journalist(
- username=self.username,
- password=self.password)
- db_session.add(self.user)
- db_session.commit()
-
- # Use a patched login function to avoid dealing with two-factor tokens
- # (which are being ignored here anyway)
- self.login = lambda username, password: \
- Journalist.login(username, password, "")
+ TestJournalist.mock_verify_token(self)
+
+ self.user, self.user_pw = TestJournalist.init_journalist()
def tearDown(self):
- common.shared_teardown()
+ TearDown.teardown()
# TODO: figure out why this is necessary here, but unnecessary in all
# of the tests in `tests/test_unit_*.py`. Without this, the session
# continues to return values even if the underlying database is deleted
@@ -113,19 +106,16 @@ def tearDown(self):
@patch('db.Journalist._scrypt_hash')
@patch('db.Journalist.valid_password', return_value=True)
- def test_login_with_valid_length_password_calls_scrypt(
- self, mock_scrypt_hash, mock_valid_password):
- self.login(self.username, self.password)
+ def test_login_calls_scrypt(self, mock_scrypt_hash, mock_valid_password):
+ Journalist.login(self.user.username, self.user_pw, 'mocked')
self.assertTrue(mock_scrypt_hash.called,
"Failed to call _scrypt_hash for password w/ valid length")
@patch('db.Journalist._scrypt_hash')
- def test_login_with_invalid_length_password_doesnt_call_scrypt(
- self, mock_scrypt_hash):
- print "test_login_with_invalid_length_password_calls_scrypt"
+ def test_invalid_login_doesnt_call_scrypt(self, mock_scrypt_hash):
invalid_pw = 'a'*(Journalist.MAX_PASSWORD_LEN + 1)
with self.assertRaises(InvalidPasswordLength):
- self.login(self.username, invalid_pw)
+ Journalist.login(self.user.username, invalid_pw, 'mocked')
self.assertFalse(mock_scrypt_hash.called,
"Called _scrypt_hash for password w/ invalid length")
@@ -134,4 +124,3 @@ def tearDownClass(cls):
# Reset the module variables that were changed to mocks so we don't
# break other tests
reload(journalist)
-
diff --git a/securedrop/tests/test_unit_crypto_util.py b/securedrop/tests/test_unit_crypto_util.py
index 083fea90071..06a059cd9c8 100644
--- a/securedrop/tests/test_unit_crypto_util.py
+++ b/securedrop/tests/test_unit_crypto_util.py
@@ -1,11 +1,14 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+
import os
import unittest
+
# Set environment variable so config.py uses a test environment
os.environ['SECUREDROP_ENV'] = 'test'
+
import config
-import common
+from common import SetUp, TearDown
import crypto_util
@@ -14,10 +17,10 @@ class TestCryptoUtil(unittest.TestCase):
"""The set of tests for crypto_util.py."""
def setUp(self):
- common.shared_setup()
+ SetUp.setup()
def tearDown(self):
- common.shared_teardown()
+ TearDown.teardown()
def test_clean(self):
with self.assertRaises(crypto_util.CryptoException):
diff --git a/securedrop/tests/test_unit_integration.py b/securedrop/tests/test_unit_integration.py
index a2896308940..877d41bd5e2 100644
--- a/securedrop/tests/test_unit_integration.py
+++ b/securedrop/tests/test_unit_integration.py
@@ -1,40 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+
+from bs4 import BeautifulSoup
+from cStringIO import StringIO
+from flask import session, g, escape
+import gnupg
+import gzip
+import mock
import os
-import unittest
import re
-from cStringIO import StringIO
-import zipfile
-from time import sleep
-import tempfile
import shutil
+import tempfile
import time
-import gzip
-
-import mock
-
-import gnupg
-from flask import session, g, escape
-from bs4 import BeautifulSoup
+import unittest
+import zipfile
# Set environment variable so config.py uses a test environment
os.environ['SECUREDROP_ENV'] = 'test'
+
+from common import Async, SetUp, TearDown
import config
import crypto_util
-import source
-import journalist
-import common
from db import db_session, Journalist
+import journalist
+import source
import store
-
-def _block_on_reply_keypair_gen(codename):
- sid = crypto_util.hash_codename(codename)
- while not crypto_util.getkey(sid):
- sleep(0.1)
-
-
class TestIntegration(unittest.TestCase):
def _login_user(self):
@@ -44,21 +36,8 @@ def _login_user(self):
token='mocked'),
follow_redirects=True)
- def _wait_for(self, function_with_assertion, timeout=5):
- """Polling wait for an arbitrary assertion."""
- # Thanks to
- # http://chimera.labs.oreilly.com/books/1234000000754/ch20.html#_a_common_selenium_problem_race_conditions
- start_time = time.time()
- while time.time() - start_time < timeout:
- try:
- return function_with_assertion()
- except AssertionError:
- time.sleep(0.1)
- # one more try, which will raise any errors if they are outstanding
- return function_with_assertion()
-
def setUp(self):
- common.shared_setup()
+ SetUp.setup()
self.source_app = source.app.test_client()
self.journalist_app = journalist.app.test_client()
@@ -81,30 +60,30 @@ def setUp(self):
self._login_user()
def tearDown(self):
- common.shared_teardown()
+ TearDown.teardown()
def test_submit_message(self):
"""When a source creates an account, test that a new entry appears in the journalist interface"""
test_msg = "This is a test message."
with self.source_app as source_app:
- rv = source_app.get('/generate')
- rv = source_app.post('/create', follow_redirects=True)
+ resp = source_app.get('/generate')
+ resp = source_app.post('/create', follow_redirects=True)
codename = session['codename']
sid = g.sid
# redirected to submission form
- rv = self.source_app.post('/submit', data=dict(
+ resp = self.source_app.post('/submit', data=dict(
msg=test_msg,
fh=(StringIO(''), ''),
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- common.logout(source_app)
+ self.assertEqual(resp.status_code, 200)
+ source_app.get('/logout')
# Request the Document Interface index
- rv = self.journalist_app.get('/')
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Sources", rv.data)
- soup = BeautifulSoup(rv.data)
+ resp = self.journalist_app.get('/')
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Sources", resp.data)
+ soup = BeautifulSoup(resp.data)
# The source should have a "download unread" link that says "1 unread"
col = soup.select('ul#cols > li')[0]
@@ -112,58 +91,58 @@ def test_submit_message(self):
self.assertIn("1 unread", unread_span.get_text())
col_url = soup.select('ul#cols > li a')[0]['href']
- rv = self.journalist_app.get(col_url)
- self.assertEqual(rv.status_code, 200)
- soup = BeautifulSoup(rv.data)
+ resp = self.journalist_app.get(col_url)
+ self.assertEqual(resp.status_code, 200)
+ soup = BeautifulSoup(resp.data)
submission_url = soup.select('ul#submissions li a')[0]['href']
self.assertIn("-msg", submission_url)
span = soup.select('ul#submissions li span.info span')[0]
self.assertRegexpMatches(span['title'], "\d+ bytes")
- rv = self.journalist_app.get(submission_url)
- self.assertEqual(rv.status_code, 200)
- decrypted_data = self.gpg.decrypt(rv.data)
+ resp = self.journalist_app.get(submission_url)
+ self.assertEqual(resp.status_code, 200)
+ decrypted_data = self.gpg.decrypt(resp.data)
self.assertTrue(decrypted_data.ok)
self.assertEqual(decrypted_data.data, test_msg)
# delete submission
- rv = self.journalist_app.get(col_url)
- self.assertEqual(rv.status_code, 200)
- soup = BeautifulSoup(rv.data)
+ resp = self.journalist_app.get(col_url)
+ self.assertEqual(resp.status_code, 200)
+ soup = BeautifulSoup(resp.data)
doc_name = soup.select(
'ul > li > input[name="doc_names_selected"]')[0]['value']
- rv = self.journalist_app.post('/bulk', data=dict(
+ resp = self.journalist_app.post('/bulk', data=dict(
action='confirm_delete',
sid=sid,
doc_names_selected=doc_name
))
- self.assertEqual(rv.status_code, 200)
- soup = BeautifulSoup(rv.data)
- self.assertIn("The following file has been selected for", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ soup = BeautifulSoup(resp.data)
+ self.assertIn("The following file has been selected for", resp.data)
# confirm delete submission
doc_name = soup.select
doc_name = soup.select(
'ul > li > input[name="doc_names_selected"]')[0]['value']
- rv = self.journalist_app.post('/bulk', data=dict(
+ resp = self.journalist_app.post('/bulk', data=dict(
action='delete',
sid=sid,
doc_names_selected=doc_name,
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- soup = BeautifulSoup(rv.data)
- self.assertIn("Submission deleted.", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ soup = BeautifulSoup(resp.data)
+ self.assertIn("Submission deleted.", resp.data)
# confirm that submission deleted and absent in list of submissions
- rv = self.journalist_app.get(col_url)
- self.assertEqual(rv.status_code, 200)
- self.assertIn("No documents to display.", rv.data)
+ resp = self.journalist_app.get(col_url)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("No documents to display.", resp.data)
# the file should be deleted from the filesystem
# since file deletion is handled by a polling worker, this test needs
# to wait for the worker to get the job and execute it
- self._wait_for(
+ Async.wait_for_assertion(
lambda: self.assertFalse(os.path.exists(store.path(sid, doc_name)))
)
@@ -173,22 +152,22 @@ def test_submit_file(self):
test_filename = "test.txt"
with self.source_app as source_app:
- rv = source_app.get('/generate')
- rv = source_app.post('/create', follow_redirects=True)
+ resp = source_app.get('/generate')
+ resp = source_app.post('/create', follow_redirects=True)
codename = session['codename']
sid = g.sid
# redirected to submission form
- rv = self.source_app.post('/submit', data=dict(
+ resp = self.source_app.post('/submit', data=dict(
msg="",
fh=(StringIO(test_file_contents), test_filename),
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- common.logout(source_app)
+ self.assertEqual(resp.status_code, 200)
+ source_app.get('/logout')
- rv = self.journalist_app.get('/')
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Sources", rv.data)
- soup = BeautifulSoup(rv.data)
+ resp = self.journalist_app.get('/')
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Sources", resp.data)
+ soup = BeautifulSoup(resp.data)
# The source should have a "download unread" link that says "1 unread"
col = soup.select('ul#cols > li')[0]
@@ -196,17 +175,17 @@ def test_submit_file(self):
self.assertIn("1 unread", unread_span.get_text())
col_url = soup.select('ul#cols > li a')[0]['href']
- rv = self.journalist_app.get(col_url)
- self.assertEqual(rv.status_code, 200)
- soup = BeautifulSoup(rv.data)
+ resp = self.journalist_app.get(col_url)
+ self.assertEqual(resp.status_code, 200)
+ soup = BeautifulSoup(resp.data)
submission_url = soup.select('ul#submissions li a')[0]['href']
self.assertIn("-doc", submission_url)
span = soup.select('ul#submissions li span.info span')[0]
self.assertRegexpMatches(span['title'], "\d+ bytes")
- rv = self.journalist_app.get(submission_url)
- self.assertEqual(rv.status_code, 200)
- decrypted_data = self.gpg.decrypt(rv.data)
+ resp = self.journalist_app.get(submission_url)
+ self.assertEqual(resp.status_code, 200)
+ decrypted_data = self.gpg.decrypt(resp.data)
self.assertTrue(decrypted_data.ok)
sio = StringIO(decrypted_data.data)
@@ -215,43 +194,43 @@ def test_submit_file(self):
self.assertEqual(unzipped_decrypted_data, test_file_contents)
# delete submission
- rv = self.journalist_app.get(col_url)
- self.assertEqual(rv.status_code, 200)
- soup = BeautifulSoup(rv.data)
+ resp = self.journalist_app.get(col_url)
+ self.assertEqual(resp.status_code, 200)
+ soup = BeautifulSoup(resp.data)
doc_name = soup.select(
'ul > li > input[name="doc_names_selected"]')[0]['value']
- rv = self.journalist_app.post('/bulk', data=dict(
+ resp = self.journalist_app.post('/bulk', data=dict(
action='confirm_delete',
sid=sid,
doc_names_selected=doc_name
))
- self.assertEqual(rv.status_code, 200)
- soup = BeautifulSoup(rv.data)
- self.assertIn("The following file has been selected for", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ soup = BeautifulSoup(resp.data)
+ self.assertIn("The following file has been selected for", resp.data)
# confirm delete submission
doc_name = soup.select
doc_name = soup.select(
'ul > li > input[name="doc_names_selected"]')[0]['value']
- rv = self.journalist_app.post('/bulk', data=dict(
+ resp = self.journalist_app.post('/bulk', data=dict(
action='delete',
sid=sid,
doc_names_selected=doc_name,
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- soup = BeautifulSoup(rv.data)
- self.assertIn("Submission deleted.", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ soup = BeautifulSoup(resp.data)
+ self.assertIn("Submission deleted.", resp.data)
# confirm that submission deleted and absent in list of submissions
- rv = self.journalist_app.get(col_url)
- self.assertEqual(rv.status_code, 200)
- self.assertIn("No documents to display.", rv.data)
+ resp = self.journalist_app.get(col_url)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("No documents to display.", resp.data)
# the file should be deleted from the filesystem
# since file deletion is handled by a polling worker, this test needs
# to wait for the worker to get the job and execute it
- self._wait_for(
+ Async.wait_for_assertion(
lambda: self.assertFalse(os.path.exists(store.path(sid, doc_name)))
)
@@ -308,89 +287,98 @@ def _can_decrypt_with_key(self, msg, key_fpr, passphrase=None):
# We have to clean up the temporary GPG dir
shutil.rmtree(gpg_tmp_dir)
+ def _block_on_reply_keypair_gen(self, filesystem_id, timeout):
+ """Wait for the asymmetric keypair associated with the
+ :class:`db.Source` with :attr:`db.Source.filesystem_id`
+ *filesystem_id* to appear in the GPG keyring, or timeout after
+ *timeout* seconds."""
+ def get_reply_keypair(filesystem_id):
+ return crypto_util.getkey(filesystem_id)
+ return Async.wait_for_function_result(get_reply_keypair)
+
def helper_test_reply(self, test_reply, expected_success=True):
test_msg = "This is a test message."
with self.source_app as source_app:
- rv = source_app.get('/generate')
- rv = source_app.post('/create', follow_redirects=True)
+ resp = source_app.get('/generate')
+ resp = source_app.post('/create', follow_redirects=True)
codename = session['codename']
sid = g.sid
# redirected to submission form
- rv = source_app.post('/submit', data=dict(
+ resp = source_app.post('/submit', data=dict(
msg=test_msg,
fh=(StringIO(''), ''),
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
+ self.assertEqual(resp.status_code, 200)
self.assertFalse(g.source.flagged)
- common.logout(source_app)
+ source_app.get('/logout')
- rv = self.journalist_app.get('/')
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Sources", rv.data)
- soup = BeautifulSoup(rv.data)
+ resp = self.journalist_app.get('/')
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Sources", resp.data)
+ soup = BeautifulSoup(resp.data)
col_url = soup.select('ul#cols > li a')[0]['href']
- rv = self.journalist_app.get(col_url)
- self.assertEqual(rv.status_code, 200)
+ resp = self.journalist_app.get(col_url)
+ self.assertEqual(resp.status_code, 200)
with self.source_app as source_app:
- rv = source_app.post('/login', data=dict(
+ resp = source_app.post('/login', data=dict(
codename=codename), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
+ self.assertEqual(resp.status_code, 200)
self.assertFalse(g.source.flagged)
- common.logout(source_app)
+ source_app.get('/logout')
with self.journalist_app as journalist_app:
- rv = journalist_app.post('/flag', data=dict(
+ resp = journalist_app.post('/flag', data=dict(
sid=sid))
- self.assertEqual(rv.status_code, 200)
+ self.assertEqual(resp.status_code, 200)
with self.source_app as source_app:
- rv = source_app.post('/login', data=dict(
+ resp = source_app.post('/login', data=dict(
codename=codename), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
+ self.assertEqual(resp.status_code, 200)
self.assertTrue(g.source.flagged)
source_app.get('/lookup')
self.assertTrue(g.source.flagged)
- common.logout(source_app)
+ source_app.get('/logout')
- # Block until the reply keypair has been generated, so we can test
- # sending a reply
- _block_on_reply_keypair_gen(codename)
+ # Block up to 15s for the reply keypair, so we can test sending a reply
+ filesystem_id = crypto_util.hash_codename(codename)
+ self._block_on_reply_keypair_gen(sid, 15)
# Create 2 replies to test deleting on journalist and source interface
for i in range(2):
- rv = self.journalist_app.post('/reply', data=dict(
+ resp = self.journalist_app.post('/reply', data=dict(
sid=sid,
msg=test_reply
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
+ self.assertEqual(resp.status_code, 200)
if not expected_success:
pass
else:
- self.assertIn("Thanks! Your reply has been stored.", rv.data)
+ self.assertIn("Thanks! Your reply has been stored.", resp.data)
with self.journalist_app as journalist_app:
- rv = journalist_app.get(col_url)
- self.assertIn("reply-", rv.data)
+ resp = journalist_app.get(col_url)
+ self.assertIn("reply-", resp.data)
- soup = BeautifulSoup(rv.data)
+ soup = BeautifulSoup(resp.data)
# Download the reply and verify that it can be decrypted with the
# journalist's key as well as the source's reply key
sid = soup.select('input[name="sid"]')[0]['value']
checkbox_values = [
soup.select('input[name="doc_names_selected"]')[1]['value']]
- rv = self.journalist_app.post('/bulk', data=dict(
+ resp = self.journalist_app.post('/bulk', data=dict(
sid=sid,
action='download',
doc_names_selected=checkbox_values
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
+ self.assertEqual(resp.status_code, 200)
- zf = zipfile.ZipFile(StringIO(rv.data), 'r')
+ zf = zipfile.ZipFile(StringIO(resp.data), 'r')
data = zf.read(zf.namelist()[0])
self._can_decrypt_with_key(data, config.JOURNALIST_KEY)
self._can_decrypt_with_key(data, crypto_util.getkey(sid), codename)
@@ -401,39 +389,39 @@ def helper_test_reply(self, test_reply, expected_success=True):
self.helper_filenames_delete(soup, last_reply_number)
with self.source_app as source_app:
- rv = source_app.post('/login', data=dict(codename=codename),
+ resp = source_app.post('/login', data=dict(codename=codename),
follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- rv = source_app.get('/lookup')
- self.assertEqual(rv.status_code, 200)
+ self.assertEqual(resp.status_code, 200)
+ resp = source_app.get('/lookup')
+ self.assertEqual(resp.status_code, 200)
if not expected_success:
# there should be no reply
- self.assertNotIn("You have received a reply.", rv.data)
+ self.assertNotIn("You have received a reply.", resp.data)
else:
self.assertIn(
"You have received a reply. For your security, please delete all replies when you're done with them.",
- rv.data)
- self.assertIn(test_reply, rv.data)
- soup = BeautifulSoup(rv.data)
+ resp.data)
+ self.assertIn(test_reply, resp.data)
+ soup = BeautifulSoup(resp.data)
msgid = soup.select(
'form.message > input[name="reply_filename"]')[0]['value']
- rv = source_app.post('/delete', data=dict(
+ resp = source_app.post('/delete', data=dict(
sid=sid,
reply_filename=msgid
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Reply deleted", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Reply deleted", resp.data)
# Make sure the reply is deleted from the filesystem
- self._wait_for(
+ Async.wait_for_assertion(
lambda: self.assertFalse(
os.path.exists(
store.path(
sid,
msgid))))
- common.logout(source_app)
+ source_app.get('/logout')
def test_delete_collection(self):
"""Test the "delete collection" button on each collection page"""
@@ -445,27 +433,27 @@ def test_delete_collection(self):
fh=(StringIO(''), ''),
), follow_redirects=True)
- rv = self.journalist_app.get('/')
+ resp = self.journalist_app.get('/')
# navigate to the collection page
- soup = BeautifulSoup(rv.data)
+ soup = BeautifulSoup(resp.data)
first_col_url = soup.select('ul#cols > li a')[0]['href']
- rv = self.journalist_app.get(first_col_url)
- self.assertEqual(rv.status_code, 200)
+ resp = self.journalist_app.get(first_col_url)
+ self.assertEqual(resp.status_code, 200)
# find the delete form and extract the post parameters
- soup = BeautifulSoup(rv.data)
+ soup = BeautifulSoup(resp.data)
delete_form_inputs = soup.select('form#delete_collection')[0]('input')
sid = delete_form_inputs[1]['value']
col_name = delete_form_inputs[2]['value']
- rv = self.journalist_app.post('/col/delete/' + sid,
+ resp = self.journalist_app.post('/col/delete/' + sid,
follow_redirects=True)
- self.assertEquals(rv.status_code, 200)
+ self.assertEquals(resp.status_code, 200)
- self.assertIn(escape("%s's collection deleted" % (col_name,)), rv.data)
- self.assertIn("No documents have been submitted!", rv.data)
+ self.assertIn(escape("%s's collection deleted" % (col_name,)), resp.data)
+ self.assertIn("No documents have been submitted!", resp.data)
# Make sure the collection is deleted from the filesystem
- self._wait_for(
+ Async.wait_for_assertion(
lambda: self.assertFalse(os.path.exists(store.path(sid)))
)
@@ -481,22 +469,22 @@ def test_delete_collections(self):
msg="This is a test " + str(i) + ".",
fh=(StringIO(''), ''),
), follow_redirects=True)
- common.logout(self.source_app)
+ self.source_app.get('/logout')
- rv = self.journalist_app.get('/')
+ resp = self.journalist_app.get('/')
# get all the checkbox values
- soup = BeautifulSoup(rv.data)
+ soup = BeautifulSoup(resp.data)
checkbox_values = [checkbox['value'] for checkbox in
soup.select('input[name="cols_selected"]')]
- rv = self.journalist_app.post('/col/process', data=dict(
+ resp = self.journalist_app.post('/col/process', data=dict(
action='delete',
cols_selected=checkbox_values
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- self.assertIn("%s collections deleted" % (num_sources,), rv.data)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("%s collections deleted" % (num_sources,), resp.data)
# Make sure the collections are deleted from the filesystem
- self._wait_for(lambda: self.assertFalse(
+ Async.wait_for_assertion(lambda: self.assertFalse(
any([os.path.exists(store.path(sid)) for sid in checkbox_values])))
def test_filenames(self):
@@ -507,14 +495,14 @@ def test_filenames(self):
self.helper_filenames_submit()
# navigate to the collection page
- rv = self.journalist_app.get('/')
- soup = BeautifulSoup(rv.data)
+ resp = self.journalist_app.get('/')
+ soup = BeautifulSoup(resp.data)
first_col_url = soup.select('ul#cols > li a')[0]['href']
- rv = self.journalist_app.get(first_col_url)
- self.assertEqual(rv.status_code, 200)
+ resp = self.journalist_app.get(first_col_url)
+ self.assertEqual(resp.status_code, 200)
# test filenames and sort order
- soup = BeautifulSoup(rv.data)
+ soup = BeautifulSoup(resp.data)
submission_filename_re = r'^{0}-[a-z0-9-_]+(-msg|-doc\.gz)\.gpg$'
for i, submission_link in enumerate(
soup.select('ul#submissions li a .filename')):
@@ -530,17 +518,17 @@ def test_filenames_delete(self):
self.helper_filenames_submit()
# navigate to the collection page
- rv = self.journalist_app.get('/')
- soup = BeautifulSoup(rv.data)
+ resp = self.journalist_app.get('/')
+ soup = BeautifulSoup(resp.data)
first_col_url = soup.select('ul#cols > li a')[0]['href']
- rv = self.journalist_app.get(first_col_url)
- self.assertEqual(rv.status_code, 200)
- soup = BeautifulSoup(rv.data)
+ resp = self.journalist_app.get(first_col_url)
+ self.assertEqual(resp.status_code, 200)
+ soup = BeautifulSoup(resp.data)
# delete file #2
self.helper_filenames_delete(soup, 1)
- rv = self.journalist_app.get(first_col_url)
- soup = BeautifulSoup(rv.data)
+ resp = self.journalist_app.get(first_col_url)
+ soup = BeautifulSoup(resp.data)
# test filenames and sort order
submission_filename_re = r'^{0}-[a-z0-9-_]+(-msg|-doc\.gz)\.gpg$'
@@ -564,15 +552,15 @@ def test_user_change_password(self):
))
# logout
- common.logout(self.journalist_app)
+ self.journalist_app.get('/logout')
# login with new credentials should redirect to index page
- rv = self.journalist_app.post('/login', data=dict(
+ resp = self.journalist_app.post('/login', data=dict(
username=self.user.username,
password='newpass',
token='mocked',
follow_redirects=True))
- self.assertEqual(rv.status_code, 302)
+ self.assertEqual(resp.status_code, 302)
def test_login_after_regenerate_hotp(self):
"""Test that journalists can login after resetting their HOTP 2fa"""
@@ -582,20 +570,20 @@ def test_login_after_regenerate_hotp(self):
otp_secret=123456))
# successful verificaton should redirect to /account
- rv = self.journalist_app.post('/account/2fa', data=dict(
+ resp = self.journalist_app.post('/account/2fa', data=dict(
token=self.user.hotp))
- self.assertEqual(rv.status_code, 302)
+ self.assertEqual(resp.status_code, 302)
# log out
- common.logout(self.journalist_app)
+ self.journalist_app.get('/logout')
# login with new 2fa secret should redirect to index page
- rv = self.journalist_app.post('/login', data=dict(
+ resp = self.journalist_app.post('/login', data=dict(
username=self.user.username,
password=self.user_pw,
token=self.user.hotp,
follow_redirects=True))
- self.assertEqual(rv.status_code, 302)
+ self.assertEqual(resp.status_code, 302)
def helper_filenames_submit(self):
self.source_app.post('/submit', data=dict(
@@ -617,27 +605,27 @@ def helper_filenames_delete(self, soup, i):
soup.select('input[name="doc_names_selected"]')[i]['value']]
# delete
- rv = self.journalist_app.post('/bulk', data=dict(
+ resp = self.journalist_app.post('/bulk', data=dict(
sid=sid,
action='confirm_delete',
doc_names_selected=checkbox_values
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
+ self.assertEqual(resp.status_code, 200)
self.assertIn(
"The following file has been selected for permanent deletion",
- rv.data)
+ resp.data)
# confirm delete
- rv = self.journalist_app.post('/bulk', data=dict(
+ resp = self.journalist_app.post('/bulk', data=dict(
sid=sid,
action='delete',
doc_names_selected=checkbox_values
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Submission deleted.", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Submission deleted.", resp.data)
# Make sure the files were deleted from the filesystem
- self._wait_for(lambda: self.assertFalse(
+ Async.wait_for_assertion(lambda: self.assertFalse(
any([os.path.exists(store.path(sid, doc_name)) for doc_name in checkbox_values])))
diff --git a/securedrop/tests/test_unit_journalist.py b/securedrop/tests/test_unit_journalist.py
index cf9e29a2b53..52217d8d3ed 100644
--- a/securedrop/tests/test_unit_journalist.py
+++ b/securedrop/tests/test_unit_journalist.py
@@ -1,356 +1,392 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Set environment variable so config.py uses a test environment
-os.environ['SECUREDROP_ENV'] = 'test'
-
+import crypto_util
from cStringIO import StringIO
-import datetime
from flask import url_for, escape
from flask_testing import TestCase
-import mock
import os
+import random
import time
import unittest
import zipfile
-import common
-import crypto_util
-import journalist
+# Set environment variable so config.py uses a test environment
+os.environ['SECUREDROP_ENV'] = 'test'
+
+import config
+from common import (Async, SetUp, TearDown, TestJournalist, TestReply,
+ TestSource, TestSubmission)
from db import (db_session, InvalidPasswordLength, Journalist, Reply, Source,
Submission)
+import journalist
-class TestJournalist(TestCase):
+class TestJournalistApp(TestCase):
+ # A method required by flask_testing.TestCase
def create_app(self):
return journalist.app
- def add_source_and_submissions(self):
- sid = crypto_util.hashd_codename(crypto_util.genrandomid())
- codename = crypto_util.display_id()
- crypto_util.genkeypair(sid, codename)
- source = Source(sid, codename)
- db_session.add(source)
- db_session.commit()
- files = ['1-abc1-msg.gpg', '2-abc2-msg.gpg']
- filenames = common.setup_test_docs(sid, files)
- return source, files
-
- def add_source_and_replies(self):
- source, files = self.add_source_and_submissions()
- files = ['1-def-reply.gpg', '2-def-reply.gpg']
- filenames = common.setup_test_replies(source.filesystem_id,
- self.user.id,
- files)
- return source, files
-
def setUp(self):
- common.shared_setup()
+ SetUp.setup()
# Patch the two-factor verification to avoid intermittent errors
- patcher = mock.patch('db.Journalist.verify_token')
- self.addCleanup(patcher.stop)
- self.mock_journalist_verify_token = patcher.start()
- self.mock_journalist_verify_token.return_value = True
-
- # Set up test users
- self.user_pw = "bar"
- self.user = Journalist(username="foo",
- password=self.user_pw)
- self.admin_user_pw = "admin"
- self.admin_user = Journalist(username="admin",
- password=self.admin_user_pw,
- is_admin=True)
- db_session.add(self.user)
- db_session.add(self.admin_user)
- db_session.commit()
+ TestJournalist.mock_verify_token(self)
- def tearDown(self):
- common.shared_teardown()
-
- def test_index_should_redirect_to_login(self):
- res = self.client.get(url_for('index'))
- self.assertRedirects(res, url_for('login'))
-
- def test_invalid_user_login_should_fail(self):
- res = self.client.post(url_for('login'), data=dict(
- username='invalid',
- password='invalid',
- token='mocked'))
- self.assert200(res)
- self.assertIn("Login failed", res.data)
-
- def test_valid_user_login_should_succeed(self):
- res = self.client.post(url_for('login'), data=dict(
- username=self.user.username,
- password=self.user_pw,
- token='mocked'),
- follow_redirects=True)
+ # Setup test users: user & admin
+ self.user, self.user_pw = TestJournalist.init_journalist()
+ self.admin, self.admin_pw = TestJournalist.init_admin()
- self.assert200(res) # successful login redirects to index
- self.assertIn("Sources", res.data)
- self.assertIn("No documents have been submitted!", res.data)
-
- def test_normal_and_admin_user_login_should_redirect_to_index(self):
- """Normal users and admin users should both redirect to the index page after logging in successfully"""
- res = self.client.post(url_for('login'), data=dict(
- username=self.user.username,
- password=self.user_pw,
- token='mocked'))
- self.assertRedirects(res, url_for('index'))
-
- res = self.client.post(url_for('login'), data=dict(
- username=self.admin_user.username,
- password=self.admin_user_pw,
- token='mocked'))
- self.assertRedirects(res, url_for('index'))
-
- def test_admin_user_has_admin_link_in_index(self):
- res = self.client.post(url_for('login'), data=dict(
- username=self.admin_user.username,
- password=self.admin_user_pw,
- token='mocked'),
- follow_redirects=True)
- admin_link = '{}'.format(
- url_for('admin_index'),
- "Admin")
- self.assertIn(admin_link, res.data)
-
- def test_user_has_edit_account_link_in_index(self):
- res = self.client.post(url_for('login'), data=dict(
- username=self.user.username,
- password=self.user_pw,
- token='mocked'),
- follow_redirects=True)
- edit_account_link = '{}'.format(
- url_for('edit_account'),
- "Edit Account")
- self.assertIn(edit_account_link, res.data)
+ def tearDown(self):
+ TearDown.teardown()
+
+ def test_unauthorized_access_redirects_to_login(self):
+ resp = self.client.get(url_for('index'))
+ self.assertRedirects(resp, url_for('login'))
+
+ def test_invalid_credentials(self):
+ resp = self.client.post(url_for('login'),
+ data=dict(username=self.user.username,
+ password='invalid',
+ token='mocked'))
+ self.assert200(resp)
+ self.assertIn("Login failed", resp.data)
+
+ def test_valid_credentials(self):
+ resp = self.client.post(url_for('login'),
+ data=dict(username=self.user.username,
+ password=self.user_pw,
+ token='mocked'),
+ follow_redirects=True)
+ self.assert200(resp) # successful login redirects to index
+ self.assertIn("Sources", resp.data)
+ self.assertIn("No documents have been submitted!", resp.data)
+
+ def test_admin_login_redirects_to_index(self):
+ resp = self.client.post(url_for('login'),
+ data=dict(username=self.admin.username,
+ password=self.admin_pw,
+ token='mocked'))
+ self.assertRedirects(resp, url_for('index'))
+
+ def test_user_login_redirects_to_index(self):
+ resp = self.client.post(url_for('login'),
+ data=dict(username=self.user.username,
+ password=self.user_pw,
+ token='mocked'))
+ self.assertRedirects(resp, url_for('index'))
+
+ def test_admin_has_link_to_edit_account_page_in_index_page(self):
+ resp = self.client.post(url_for('login'),
+ data=dict(username=self.admin.username,
+ password=self.admin_pw,
+ token='mocked'),
+ follow_redirects=True)
+ edit_account_link = '{}'.format(url_for('edit_account'),
+ "Edit Account")
+ self.assertIn(edit_account_link, resp.data)
+
+ def test_user_has_link_to_edit_account_page_in_index_page(self):
+ resp = self.client.post(url_for('login'),
+ data=dict(username=self.user.username,
+ password=self.user_pw,
+ token='mocked'),
+ follow_redirects=True)
+ edit_account_link = '{}'.format(url_for('edit_account'),
+ "Edit Account")
+ self.assertIn(edit_account_link, resp.data)
+
+
+ def test_admin_has_link_to_admin_index_page_in_index_page(self):
+ resp = self.client.post(url_for('login'),
+ data=dict(username=self.admin.username,
+ password=self.admin_pw,
+ token='mocked'),
+ follow_redirects=True)
+ admin_link = '{}'.format(url_for('admin_index'),
+ "Admin")
+ self.assertIn(admin_link, resp.data)
+
+ def test_user_lacks_link_to_admin_index_page_in_index_page(self):
+ resp = self.client.post(url_for('login'),
+ data=dict(username=self.user.username,
+ password=self.user_pw,
+ token='mocked'),
+ follow_redirects=True)
+ admin_link = '{}'.format(url_for('admin_index'),
+ "Admin")
+ self.assertNotIn(admin_link, resp.data)
+
+ # WARNING: we are purposely doing something that would not work in
+ # production in the _login_user and _login_admin methods. This is done as a
+ # reminder to the test developer that the flask_testing.TestCase only uses
+ # one request context per method (see
+ # https://github.com/freedomofpress/securedrop/issues/1444). By explicitly
+ # making a point of this, we hope to avoid the introduction of new tests,
+ # that do not truly prove their result because of this disconnect between
+ # request context in Flask Testing and production.
+ #
+ # TODO: either ditch Flask Testing or subclass it as discussed in the
+ # aforementioned issue to fix the described problem.
+ def _login_admin(self):
+ self._ctx.g.user = self.admin
def _login_user(self):
- self.client.post(url_for('login'), data=dict(
- username=self.user.username,
- password=self.user_pw,
- token='mocked'),
- follow_redirects=True)
+ self._ctx.g.user = self.user
- def _login_admin(self):
- self.client.post(url_for('login'), data=dict(
- username=self.admin_user.username,
- password=self.admin_user_pw,
- token='mocked'),
- follow_redirects=True)
+ def test_admin_logout_redirects_to_index(self):
+ self._login_admin()
+ resp = self.client.get(url_for('logout'))
+ self.assertRedirects(resp, url_for('index'))
- def test_user_logout(self):
+ def test_user_logout_redirects_to_index(self):
self._login_user()
- res = self.client.get(url_for('logout'))
- self.assertRedirects(res, url_for('index'))
-
- def test_admin_logout(self):
- self._login_admin()
- res = self.client.get(url_for('logout'))
- self.assertRedirects(res, url_for('index'))
+ resp = self.client.get(url_for('logout'))
+ self.assertRedirects(resp, url_for('index'))
def test_admin_index(self):
self._login_admin()
- res = self.client.get(url_for('admin_index'))
- self.assert200(res)
- self.assertIn("Admin Interface", res.data)
+ resp = self.client.get(url_for('admin_index'))
+ self.assert200(resp)
+ self.assertIn("Admin Interface", resp.data)
def test_admin_delete_user(self):
+ # Verify journalist is in the database
+ self.assertNotEqual(Journalist.query.get(self.user.id), None)
+
self._login_admin()
+ resp = self.client.post(url_for('admin_delete_user', user_id=self.user.id),
+ follow_redirects=True)
- res = self.client.post(
- url_for('admin_delete_user', user_id=self.user.id),
- follow_redirects=True)
- self.assert200(res)
+ # Assert correct interface behavior
+ self.assert200(resp)
self.assertIn(escape("Deleted user '{}'".format(self.user.username)),
- res.data)
-
- # verify journalist foo is no longer in the database
- user = Journalist.query.get(self.user.id)
- self.assertEqual(user, None)
+ resp.data)
+ # Verify journalist is no longer in the database
+ self.assertEqual(Journalist.query.get(self.user.id), None)
- def test_admin_delete_invalid_user(self):
+ def test_admin_deletes_invalid_user_404(self):
self._login_admin()
-
invalid_user_pk = max([user.id for user in Journalist.query.all()]) + 1
- res = self.client.post(url_for('admin_delete_user',
+ resp = self.client.post(url_for('admin_delete_user',
user_id=invalid_user_pk))
- self.assert404(res)
+ self.assert404(resp)
- def test_admin_edits_user_password_valid(self):
+ def test_admin_edits_user_password_success_response(self):
self._login_admin()
- res = self.client.post(
+ resp = self.client.post(
url_for('admin_edit_user', user_id=self.user.id),
- data=dict(username='foo', is_admin=False,
+ data=dict(username=self.user.username, is_admin=False,
password='valid', password_again='valid'))
- self.assertIn('Password successfully changed', res.data)
+ self.assertIn('Password successfully changed', resp.data)
- def test_admin_edits_user_password_dont_match(self):
+ def test_user_edits_password_success_reponse(self):
+ self._login_user()
+ resp = self.client.post(url_for('edit_account'),
+ data=dict(password='valid',
+ password_again='valid'))
+ self.assertIn("Password successfully changed", resp.data)
+
+ def test_admin_edits_user_password_mismatch_warning(self):
self._login_admin()
- res = self.client.post(
+ resp = self.client.post(
url_for('admin_edit_user', user_id=self.user.id),
- data=dict(username='foo', is_admin=False, password='not',
- password_again='thesame'),
+ data=dict(username=self.user.username, is_admin=False,
+ password='not', password_again='thesame'),
follow_redirects=True)
- self.assertIn(escape("Passwords didn't match"), res.data)
+ self.assertIn(escape("Passwords didn't match"), resp.data)
+
+ def test_user_edits_password_mismatch_redirect(self):
+ self._login_user()
+ resp = self.client.post(url_for('edit_account'), data=dict(
+ password='not',
+ password_again='thesame'))
+ self.assertRedirects(resp, url_for('edit_account'))
+
+ def test_admin_add_user_password_mismatch_warning(self):
+ self._login_admin()
+ resp = self.client.post(url_for('admin_add_user'),
+ data=dict(username='dellsberg',
+ password='not',
+ password_again='thesame',
+ is_admin=False))
+ self.assertIn('Passwords didn', resp.data)
+
+ def test_max_password_length(self):
+ """Creating a Journalist with a password that is greater than the
+ maximum password length should raise an exception"""
+ overly_long_password = 'a'*(Journalist.MAX_PASSWORD_LEN + 1)
+ with self.assertRaises(InvalidPasswordLength):
+ temp_journalist = Journalist(
+ username="My Password is Too Big!",
+ password=overly_long_password)
- def test_admin_edits_user_password_too_long(self):
+ def test_admin_edits_user_password_too_long_warning(self):
self._login_admin()
overly_long_password = 'a' * (Journalist.MAX_PASSWORD_LEN + 1)
- res = self.client.post(
+ resp = self.client.post(
url_for('admin_edit_user', user_id=self.user.id),
- data=dict(username='foo', is_admin=False,
+ data=dict(username=self.user.username, is_admin=False,
password=overly_long_password,
password_again=overly_long_password),
follow_redirects=True)
- self.assertIn('Your password is too long', res.data)
+ self.assertIn('Your password is too long', resp.data)
+
+ def test_user_edits_password_too_long_warning(self):
+ self._login_user()
+ overly_long_password = 'a' * (Journalist.MAX_PASSWORD_LEN + 1)
+
+ resp = self.client.post(url_for('edit_account'),
+ data=dict(password=overly_long_password,
+ password_again=overly_long_password),
+ follow_redirects=True)
+
+ self.assertIn('Your password is too long', resp.data)
+
+ def test_admin_add_user_password_too_long_warning(self):
+ self._login_admin()
+
+ overly_long_password = 'a' * (Journalist.MAX_PASSWORD_LEN + 1)
+ resp = self.client.post(
+ url_for('admin_add_user'),
+ data=dict(username='dellsberg', password=overly_long_password,
+ password_again=overly_long_password, is_admin=False))
+
+ self.assertIn('password is too long', resp.data)
def test_admin_edits_user_invalid_username(self):
"""Test expected error message when admin attempts to change a user's
username to a username that is taken by another user."""
self._login_admin()
+ new_username = self.admin.username
- new_username = self.admin_user.username
- res = self.client.post(
+ resp = self.client.post(
url_for('admin_edit_user', user_id=self.user.id),
data=dict(username=new_username, is_admin=False,
- password='', password_again='')
- )
+ password='', password_again=''))
self.assertIn('Username {} is already taken'.format(new_username),
- res.data)
+ resp.data)
- def test_admin_reset_hotp_success(self):
+ def test_admin_resets_user_hotp(self):
self._login_admin()
old_hotp = self.user.hotp.secret
- res = self.client.post(
- url_for('admin_reset_two_factor_hotp'),
- data=dict(uid=self.user.id, otp_secret=123456)
- )
-
+ resp = self.client.post(url_for('admin_reset_two_factor_hotp'),
+ data=dict(uid=self.user.id, otp_secret=123456))
new_hotp = self.user.hotp.secret
+ # check that hotp is different
self.assertNotEqual(old_hotp, new_hotp)
-
- self.assertRedirects(res,
+ # Redirect to admin 2FA view
+ self.assertRedirects(resp,
url_for('admin_new_user_two_factor', uid=self.user.id))
- def test_admin_reset_hotp_empty(self):
- self._login_admin()
- res = self.client.post(
- url_for('admin_reset_two_factor_hotp'),
- data=dict(uid=self.user.id)
- )
+ def test_user_resets_hotp(self):
+ self._login_user()
+ oldHotp = self.user.hotp
+
+ resp = self.client.post(url_for('account_reset_two_factor_hotp'),
+ data=dict(otp_secret=123456))
+ newHotp = self.user.hotp
- self.assertIn('Change Secret', res.data)
+ # check that hotp is different
+ self.assertNotEqual(oldHotp, newHotp)
+ # should redirect to verification page
+ self.assertRedirects(resp, url_for('account_new_two_factor'))
- def test_admin_reset_totp_success(self):
+ def test_admin_resets_user_totp(self):
self._login_admin()
old_totp = self.user.totp
- res = self.client.post(
+ resp = self.client.post(
url_for('admin_reset_two_factor_totp'),
- data=dict(uid=self.user.id)
- )
+ data=dict(uid=self.user.id))
new_totp = self.user.totp
self.assertNotEqual(old_totp, new_totp)
- self.assertRedirects(res,
+ self.assertRedirects(resp,
url_for('admin_new_user_two_factor', uid=self.user.id))
- def test_admin_new_user_2fa_success(self):
- self._login_admin()
-
- res = self.client.post(
- url_for('admin_new_user_two_factor', uid=self.user.id),
- data=dict(token='mocked')
- )
-
- self.assertRedirects(res, url_for('admin_index'))
+ def test_user_resets_totp(self):
+ self._login_user()
+ oldTotp = self.user.totp
- def test_admin_new_user_2fa_get_req(self):
- self._login_admin()
+ resp = self.client.post(url_for('account_reset_two_factor_totp'))
+ newTotp = self.user.totp
- res = self.client.get(
- url_for('admin_new_user_two_factor', uid=self.user.id)
- )
+ # check that totp is different
+ self.assertNotEqual(oldTotp, newTotp)
- # any GET req should take a user to the admin_new_user_two_factor page
- self.assertIn('Authenticator', res.data)
+ # should redirect to verification page
+ self.assertRedirects(resp, url_for('account_new_two_factor'))
- def test_admin_add_user_get_req(self):
+ def test_admin_resets_hotp_with_missing_otp_secret_key(self):
self._login_admin()
+ resp = self.client.post(url_for('admin_reset_two_factor_hotp'),
+ data=dict(uid=self.user.id))
- res = self.client.get(url_for('admin_add_user'))
-
- # any GET req should take a user to the admin_add_user page
- self.assertIn('Add user', res.data)
+ self.assertIn('Change Secret', resp.data)
- def test_admin_add_user_success(self):
+ def test_admin_new_user_2fa_redirect(self):
self._login_admin()
+ resp = self.client.post(
+ url_for('admin_new_user_two_factor', uid=self.user.id),
+ data=dict(token='mocked'))
+ self.assertRedirects(resp, url_for('admin_index'))
- res = self.client.post(
- url_for('admin_add_user'),
- data=dict(username='dellsberg',
- password='pentagonpapers',
- password_again='pentagonpapers',
- is_admin=False)
- )
-
- self.assertRedirects(res, url_for('admin_new_user_two_factor', uid=3))
-
- def test_admin_add_user_failure_no_username(self):
+ def test_http_get_on_admin_new_user_two_factor_page(self):
self._login_admin()
+ resp = self.client.get(url_for('admin_new_user_two_factor', uid=self.user.id))
+ # any GET req should take a user to the admin_new_user_two_factor page
+ self.assertIn('Authenticator', resp.data)
- res = self.client.post(
- url_for('admin_add_user'),
- data=dict(username='', password='pentagonpapers',
- password_again='pentagonpapers', is_admin=False))
-
- self.assertIn('Missing username', res.data)
-
- def test_admin_add_user_failure_passwords_dont_match(self):
+ def test_http_get_on_admin_add_user_page(self):
self._login_admin()
+ resp = self.client.get(url_for('admin_add_user'))
+ # any GET req should take a user to the admin_add_user page
+ self.assertIn('Add user', resp.data)
- res = self.client.post(
- url_for('admin_add_user'),
- data=dict(username='dellsberg', password='not',
- password_again='thesame', is_admin=False))
-
- self.assertIn('Passwords didn', res.data)
-
- def test_admin_add_user_failure_password_too_long(self):
+ def test_admin_add_user(self):
self._login_admin()
+ max_journalist_pk = max([user.id for user in Journalist.query.all()])
- overly_long_password = 'a' * (Journalist.MAX_PASSWORD_LEN + 1)
- res = self.client.post(
- url_for('admin_add_user'),
- data=dict(username='dellsberg', password=overly_long_password,
- password_again=overly_long_password, is_admin=False))
+ resp = self.client.post(url_for('admin_add_user'),
+ data=dict(username='dellsberg',
+ password='pentagonpapers',
+ password_again='pentagonpapers',
+ is_admin=False))
- self.assertIn('password is too long', res.data)
+ self.assertRedirects(resp, url_for('admin_new_user_two_factor',
+ uid=max_journalist_pk+1))
- def test_admin_authorization_for_gets(self):
+ def test_admin_add_user_without_username(self):
+ self._login_admin()
+ resp = self.client.post(url_for('admin_add_user'),
+ data=dict(username='',
+ password='pentagonpapers',
+ password_again='pentagonpapers',
+ is_admin=False))
+ self.assertIn('Missing username', resp.data)
+
+ def test_admin_page_restriction_http_gets(self):
admin_urls = [url_for('admin_index'), url_for('admin_add_user'),
url_for('admin_edit_user', user_id=self.user.id)]
self._login_user()
for admin_url in admin_urls:
- res = self.client.get(admin_url)
- self.assertStatus(res, 302)
+ resp = self.client.get(admin_url)
+ self.assertStatus(resp, 302)
- def test_admin_authorization_for_posts(self):
+ def test_admin_page_restrction_http_posts(self):
admin_urls = [url_for('admin_reset_two_factor_totp'),
url_for('admin_reset_two_factor_hotp'),
url_for('admin_add_user', user_id=self.user.id),
@@ -361,8 +397,8 @@ def test_admin_authorization_for_posts(self):
url_for('admin_delete_user', user_id=self.user.id)]
self._login_user()
for admin_url in admin_urls:
- res = self.client.post(admin_url)
- self.assertStatus(res, 302)
+ resp = self.client.post(admin_url)
+ self.assertStatus(resp, 302)
def test_user_authorization_for_gets(self):
urls = [url_for('index'), url_for('col', sid='1'),
@@ -370,8 +406,8 @@ def test_user_authorization_for_gets(self):
url_for('edit_account')]
for url in urls:
- res = self.client.get(url)
- self.assertStatus(res, 302)
+ resp = self.client.get(url)
+ self.assertStatus(resp, 302)
def test_user_authorization_for_posts(self):
urls = [url_for('add_star', sid='1'), url_for('remove_star', sid='1'),
@@ -381,265 +417,171 @@ def test_user_authorization_for_posts(self):
url_for('account_reset_two_factor_totp'),
url_for('account_reset_two_factor_hotp')]
for url in urls:
- res = self.client.post(url)
- self.assertStatus(res, 302)
-
- def test_invalid_user_password_change(self):
- self._login_user()
- res = self.client.post(url_for('edit_account'), data=dict(
- password='not',
- password_again='thesame'))
- self.assertRedirects(res, url_for('edit_account'))
-
- def test_too_long_user_password_change(self):
- self._login_user()
- overly_long_password = 'a' * (Journalist.MAX_PASSWORD_LEN + 1)
-
- res = self.client.post(url_for('edit_account'), data=dict(
- password=overly_long_password,
- password_again=overly_long_password),
- follow_redirects=True)
-
- self.assertIn('Your password is too long', res.data)
-
- def test_valid_user_password_change(self):
- self._login_user()
- res = self.client.post(url_for('edit_account'), data=dict(
- password='valid',
- password_again='valid'))
- self.assertIn("Password successfully changed", res.data)
-
- def test_regenerate_totp(self):
- self._login_user()
- oldTotp = self.user.totp
-
- res = self.client.post(url_for('account_reset_two_factor_totp'))
- newTotp = self.user.totp
-
- # check that totp is different
- self.assertNotEqual(oldTotp, newTotp)
-
- # should redirect to verification page
- self.assertRedirects(res, url_for('account_new_two_factor'))
-
- def test_edit_hotp(self):
- self._login_user()
- oldHotp = self.user.hotp
-
- res = self.client.post(
- url_for('account_reset_two_factor_hotp'),
- data=dict(otp_secret=123456)
- )
- newHotp = self.user.hotp
-
- # check that hotp is different
- self.assertNotEqual(oldHotp, newHotp)
-
- # should redirect to verification page
- self.assertRedirects(res, url_for('account_new_two_factor'))
-
- def test_delete_source_deletes_submissions(self):
- """Verify that when a source is deleted, the submissions that
- correspond to them are also deleted."""
-
- source, files = self.add_source_and_submissions()
-
- journalist.delete_collection(source.filesystem_id)
-
- # Source should be gone
- results = db_session.query(Source).filter(Source.id == source.id).all()
- self.assertEqual(results, [])
-
- # Submissions should be gone
- results = db_session.query(Submission.source_id == source.id).all()
+ resp = self.client.post(url)
+ self.assertStatus(resp, 302)
+
+ def _delete_collection_setup(self):
+ self.source, _ = TestSource.init_source()
+ TestSubmission.submit(self.source, 2)
+ TestReply.reply(self.user, self.source, 2)
+
+ def test_delete_collection_updates_db(self):
+ """Verify that when a source is deleted, their Source identity
+ record, as well as Reply & Submission records associated with
+ that record are purged from the database."""
+ self._delete_collection_setup()
+ journalist.delete_collection(self.source.filesystem_id)
+ results = Source.query.filter(Source.id == self.source.id).all()
self.assertEqual(results, [])
-
- def test_delete_source_deletes_replies(self):
- """Verify that when a source is deleted, the replies that
- correspond to them are also deleted."""
-
- source, files = self.add_source_and_replies()
-
- journalist.delete_collection(source.filesystem_id)
-
- # Source should be gone
- results = db_session.query(Source).filter(Source.id == source.id).all()
+ results = db_session.query(Submission.source_id == self.source.id).all()
self.assertEqual(results, [])
-
- # Replies should be gone
- results = db_session.query(Reply.source_id == source.id).all()
+ results = db_session.query(Reply.source_id == self.source.id).all()
self.assertEqual(results, [])
def test_delete_source_deletes_source_key(self):
"""Verify that when a source is deleted, the PGP key that corresponds
to them is also deleted."""
-
- source, files = self.add_source_and_submissions()
-
+ self._delete_collection_setup()
# Source key exists
- source_key = crypto_util.getkey(source.filesystem_id)
+ source_key = crypto_util.getkey(self.source.filesystem_id)
self.assertNotEqual(source_key, None)
- journalist.delete_collection(source.filesystem_id)
+ journalist.delete_collection(self.source.filesystem_id)
# Source key no longer exists
- source_key = crypto_util.getkey(source.filesystem_id)
+ source_key = crypto_util.getkey(self.source.filesystem_id)
self.assertEqual(source_key, None)
def test_delete_source_deletes_docs_on_disk(self):
"""Verify that when a source is deleted, the encrypted documents that
exist on disk is also deleted."""
-
- source, files = self.add_source_and_submissions()
-
+ self._delete_collection_setup()
# Encrypted documents exists
- dir_source_docs = os.path.join(config.STORE_DIR, source.filesystem_id)
-
+ dir_source_docs = os.path.join(config.STORE_DIR, self.source.filesystem_id)
self.assertTrue(os.path.exists(dir_source_docs))
- job = journalist.delete_collection(source.filesystem_id)
+ job = journalist.delete_collection(self.source.filesystem_id)
- # Block for up to 5s to await asynchronous Redis job result
- timeout = datetime.datetime.now() + datetime.timedelta(0,5)
- while 1:
- if job.result == "success":
- break
- elif datetime.datetime.now() > timeout:
- self.assertTrue(False)
+ # Wait up to 5s to wait for Redis worker `srm` operation to complete
+ Async.wait_for_redis_worker(job)
# Encrypted documents no longer exist
- dir_source_docs = os.path.join(config.STORE_DIR, source.filesystem_id)
self.assertFalse(os.path.exists(dir_source_docs))
- def test_download_selected_from_source(self):
- sid = 'EQZGCJBRGISGOTC2NZVWG6LILJBHEV3CINNEWSCLLFTUWZJPKJFECLS2NZ4G4U3QOZCFKTTPNZMVIWDCJBBHMUDBGFHXCQ3R'
- source = Source(sid, crypto_util.display_id())
- db_session.add(source)
- db_session.commit()
- files = ['1-abc1-msg.gpg', '2-abc2-msg.gpg', '3-abc3-msg.gpg', '4-abc4-msg.gpg']
- selected_files = files[:2]
- unselected_files = files[2:]
- common.setup_test_docs(sid, files)
+ def test_download_selected_submissions_from_source(self):
+ source, _ = TestSource.init_source()
+ submissions = set(TestSubmission.submit(source, 4))
- self._login_user()
- rv = self.client.post('/bulk',
- data=dict(action='download', sid=sid,
- doc_names_selected=selected_files))
-
- self.assertEqual(rv.status_code, 200)
- self.assertEqual(rv.content_type, 'application/zip')
- self.assertTrue(zipfile.is_zipfile(StringIO(rv.data)))
-
- for file in selected_files:
- self.assertTrue(
- zipfile.ZipFile(StringIO(rv.data)).getinfo(
- os.path.join(source.journalist_filename, file))
- )
+ selected_submissions = random.sample(submissions, 2)
+ selected_fnames = [submission.filename
+ for submission in selected_submissions]
- for file in unselected_files:
+ self._login_user()
+ resp = self.client.post(
+ '/bulk', data=dict(action='download',
+ sid=source.filesystem_id,
+ doc_names_selected=selected_fnames))
+ # The download request was succesful, and the app returned a zipfile
+ self.assertEqual(resp.status_code, 200)
+ self.assertEqual(resp.content_type, 'application/zip')
+ self.assertTrue(zipfile.is_zipfile(StringIO(resp.data)))
+ # The submissions selected are in the zipfile
+ for filename in selected_fnames:
+ self.assertTrue(zipfile.ZipFile(StringIO(resp.data)).getinfo(
+ os.path.join(source.journalist_filename, filename)))
+ # The submissions not selected are absent from the zipfile
+ not_selected_submissions = submissions.difference(selected_submissions)
+ not_selected_fnames = [submission.filename
+ for submission in not_selected_submissions]
+ for filename in not_selected_fnames:
try:
- zipfile.ZipFile(StringIO(rv.data)).getinfo(
- os.path.join(source.journalist_filename, file))
+ zipfile.ZipFile(StringIO(resp.data)).getinfo(
+ os.path.join(source.journalist_filename, filename))
except KeyError:
pass
else:
self.assertTrue(False)
- def _setup_two_sources(self):
- # Add two sources to the database
- self.sid = crypto_util.hash_codename(crypto_util.genrandomid())
- self.sid2 = crypto_util.hash_codename(crypto_util.genrandomid())
- self.source = Source(self.sid, crypto_util.display_id())
- self.source2 = Source(self.sid2, crypto_util.display_id())
- db_session.add(self.source)
- db_session.add(self.source2)
- db_session.commit()
-
- # The sources have made two submissions each, both being messages
- self.files = ['1-s1-msg.gpg', '2-s1-msg.gpg']
- common.setup_test_docs(self.sid, self.files)
- self.files2 = ['1-s2-msg.gpg', '2-s2-msg.gpg']
- common.setup_test_docs(self.sid2, self.files2)
-
- # Mark the first message for each of the two sources read
- for fn in (self.files[0], self.files2[0]):
- s = Submission.query.filter(Submission.filename == fn).one()
- s.downloaded = True
- db_session.commit()
-
-
-
- def test_download_unread_selected_sources(self):
- self._setup_two_sources()
+ def _bulk_download_setup(self):
+ """Create a couple sources, make some submissions on their behalf,
+ mark some of them as downloaded, and then perform *action* on all
+ sources."""
+ self.source0, _ = TestSource.init_source()
+ self.source1, _ = TestSource.init_source()
+ self.submissions0 = set(TestSubmission.submit(self.source0, 2))
+ self.submissions1 = set(TestSubmission.submit(self.source1, 3))
+ self.downloaded0 = random.sample(self.submissions0, 1)
+ TestSubmission.mark_downloaded(*self.downloaded0)
+ self.not_downloaded0 = self.submissions0.difference(self.downloaded0)
+ self.downloaded1 = random.sample(self.submissions1, 2)
+ TestSubmission.mark_downloaded(*self.downloaded1)
+ self.not_downloaded1 = self.submissions1.difference(self.downloaded1)
+
+
+ def test_download_unread_all_sources(self):
+ self._bulk_download_setup()
self._login_user()
- resp = self.client.post('/col/process',
- data=dict(action='download-unread',
- cols_selected=[self.sid, self.sid2]))
-
- self.assertEqual(resp.status_code, 200)
- self.assertEqual(resp.content_type, 'application/zip')
- self.assertTrue(zipfile.is_zipfile(StringIO(resp.data)))
-
- for file in (self.files[1], self.files2[1]):
+ # Download all unread messages from all sources
+ self.resp = self.client.post(
+ '/col/process',
+ data=dict(action='download-unread',
+ cols_selected=[self.source0.filesystem_id,
+ self.source1.filesystem_id]))
+
+ # The download request was succesful, and the app returned a zipfile
+ self.assertEqual(self.resp.status_code, 200)
+ self.assertEqual(self.resp.content_type, 'application/zip')
+ self.assertTrue(zipfile.is_zipfile(StringIO(self.resp.data)))
+ # All the not dowloaded submissions are in the zipfile
+ for submission in self.not_downloaded0.union(self.not_downloaded1):
self.assertTrue(
- zipfile.ZipFile(StringIO(resp.data)).getinfo(
- os.path.join('unread', file))
+ zipfile.ZipFile(StringIO(self.resp.data)).getinfo(
+ os.path.join('unread', submission.filename))
)
-
- for file in (self.files[0], self.files2[0]):
+ # All the downloaded submissions are absent from the zipfile
+ for submission in self.downloaded0 + self.downloaded1:
try:
- zipfile.ZipFile(StringIO(resp.data)).getinfo(
- os.path.join('unread', file))
+ zipfile.ZipFile(StringIO(self.resp.data)).getinfo(
+ os.path.join('unread', submission.filename))
except KeyError:
pass
else:
self.assertTrue(False)
def test_download_all_selected_sources(self):
- self._setup_two_sources()
+ self._bulk_download_setup()
self._login_user()
- resp = self.client.post('/col/process',
- data=dict(action='download-all',
- cols_selected=[self.sid2]))
-
- self.assertEqual(resp.status_code, 200)
- self.assertEqual(resp.content_type, 'application/zip')
- self.assertTrue(zipfile.is_zipfile(StringIO(resp.data)))
-
- for file in self.files2:
+ # Dowload all messages from self.source1
+ self.resp = self.client.post(
+ '/col/process',
+ data=dict(action='download-all',
+ cols_selected=[self.source1.filesystem_id]))
+
+ # The download request was succesful, and the app returned a zipfile
+ self.assertEqual(self.resp.status_code, 200)
+ self.assertEqual(self.resp.content_type, 'application/zip')
+ self.assertTrue(zipfile.is_zipfile(StringIO(self.resp.data)))
+ # All messages from self.source1 are in the zipfile
+ for submission in self.submissions1:
self.assertTrue(
- zipfile.ZipFile(StringIO(resp.data)).getinfo(
- os.path.join('all', file))
+ zipfile.ZipFile(StringIO(self.resp.data)).getinfo(
+ os.path.join('all', submission.filename))
)
-
- for file in self.files:
+ # All messages from self.source2 are absent from the zipfile
+ for submission in self.submissions0:
try:
- zipfile.ZipFile(StringIO(resp.data)).getinfo(
- os.path.join('all', file))
+ zipfile.ZipFile(StringIO(self.resp.data)).getinfo(
+ os.path.join('all', submission.filename))
except KeyError:
pass
else:
self.assertTrue(False)
- def test_max_password_length(self):
- """Creating a Journalist with a password that is greater than the
- maximum password length should raise an exception"""
- overly_long_password = 'a'*(Journalist.MAX_PASSWORD_LEN + 1)
- with self.assertRaises(InvalidPasswordLength):
- temp_journalist = Journalist(
- username="My Password is Too Big!",
- password=overly_long_password)
-
- def test_add_star(self):
+ def test_add_star_redirects_to_index(self):
+ source, _ = TestSource.init_source()
self._login_user()
-
- sid = 'EQZGCJBRGISGOTC2NZVWG6LILJBHEV3CINNEWSCLLFTUWZJPKJFECLS2NZ4G4U3QOZCFKTTPNZMVIWDCJBBHMUDBGFHXCQ3R'
- source = Source(sid, crypto_util.display_id())
- db_session.add(source)
- db_session.commit()
-
- res = self.client.post(url_for('add_star', sid=sid))
- self.assertRedirects(res, url_for('index'))
+ resp = self.client.post(url_for('add_star', sid=source.filesystem_id))
+ self.assertRedirects(resp, url_for('index'))
if __name__ == "__main__":
diff --git a/securedrop/tests/test_unit_source.py b/securedrop/tests/test_unit_source.py
index 77eabfaa6a2..b50593a7d39 100644
--- a/securedrop/tests/test_unit_source.py
+++ b/securedrop/tests/test_unit_source.py
@@ -1,30 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import common
+from bs4 import BeautifulSoup
+from cStringIO import StringIO
+from flask import session, escape
+from flask_testing import TestCase
+from mock import patch, ANY
import os
import re
import unittest
-from cStringIO import StringIO
-from bs4 import BeautifulSoup
-from flask_testing import TestCase
-from flask import session, escape
-from mock import patch, ANY
-import source
-from db import Source
+# Set environment variable so config.py uses a test environment
os.environ['SECUREDROP_ENV'] = 'test'
+from common import SetUp, TearDown, TestSource
+from db import Source
+import source
+
-class TestSource(TestCase):
+class TestSourceApp(TestCase):
def create_app(self):
return source.app
def setUp(self):
- common.shared_setup()
+ SetUp.setup()
def tearDown(self):
- common.shared_teardown()
+ TearDown.teardown()
def test_index(self):
"""Test that the landing page loads and looks how we expect"""
@@ -44,14 +46,14 @@ def _find_codename(self, html):
def test_generate(self):
with self.client as c:
- rv = c.get('/generate')
- self.assertEqual(rv.status_code, 200)
+ resp = c.get('/generate')
+ self.assertEqual(resp.status_code, 200)
session_codename = session['codename']
- self.assertIn("Remember this codename and keep it secret", rv.data)
+ self.assertIn("Remember this codename and keep it secret", resp.data)
self.assertIn(
"To protect your identity, we're assigning you a unique codename.",
- rv.data)
- codename = self._find_codename(rv.data)
+ resp.data)
+ codename = self._find_codename(resp.data)
# default codename length is 7 words
self.assertEqual(len(codename.split()), 7)
# codename is also stored in the session - make sure it matches the
@@ -80,90 +82,90 @@ def test_generate_has_login_link(self):
"""The generate page should have a link to remind people to login
if they already have a codename, rather than create a new one.
"""
- rv = self.client.get('/generate')
- self.assertIn("Already have a codename?", rv.data)
- soup = BeautifulSoup(rv.data)
+ resp = self.client.get('/generate')
+ self.assertIn("Already have a codename?", resp.data)
+ soup = BeautifulSoup(resp.data)
already_have_codename_link = soup.select('a#already-have-codename')[0]
self.assertEqual(already_have_codename_link['href'], '/login')
def test_generate_already_logged_in(self):
self._new_codename()
# Make sure it redirects to /lookup when logged in
- rv = self.client.get('/generate')
- self.assertEqual(rv.status_code, 302)
+ resp = self.client.get('/generate')
+ self.assertEqual(resp.status_code, 302)
# Make sure it flashes the message on the lookup page
- rv = self.client.get('/generate', follow_redirects=True)
+ resp = self.client.get('/generate', follow_redirects=True)
# Should redirect to /lookup
- self.assertEqual(rv.status_code, 200)
- self.assertIn("because you are already logged in.", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("because you are already logged in.", resp.data)
def test_create(self):
with self.client as c:
- rv = c.get('/generate')
+ resp = c.get('/generate')
codename = session['codename']
- rv = c.post('/create', follow_redirects=True)
+ resp = c.post('/create', follow_redirects=True)
self.assertTrue(session['logged_in'])
# should be redirected to /lookup
- self.assertIn("Submit documents and messages", rv.data)
+ self.assertIn("Submit documents and messages", resp.data)
def _new_codename(self):
- return common.new_codename(self.client, session)
+ return TestSource.new_codename(self.client, session)
def test_lookup(self):
- """Test various elements on the /lookup page"""
+ """Test various elements on the /lookup page."""
codename = self._new_codename()
- rv = self.client.post('login', data=dict(codename=codename),
- follow_redirects=True)
+ resp = self.client.post('login', data=dict(codename=codename),
+ follow_redirects=True)
# redirects to /lookup
- self.assertIn("public key", rv.data)
+ self.assertIn("public key", resp.data)
# download the public key
- rv = self.client.get('journalist-key')
- self.assertIn("BEGIN PGP PUBLIC KEY BLOCK", rv.data)
+ resp = self.client.get('journalist-key')
+ self.assertIn("BEGIN PGP PUBLIC KEY BLOCK", resp.data)
def test_login_and_logout(self):
- rv = self.client.get('/login')
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Login to check for responses", rv.data)
+ resp = self.client.get('/login')
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Login to check for responses", resp.data)
codename = self._new_codename()
with self.client as c:
- rv = c.post('/login', data=dict(codename=codename),
+ resp = c.post('/login', data=dict(codename=codename),
follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Submit documents and messages", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Submit documents and messages", resp.data)
self.assertTrue(session['logged_in'])
- common.logout(c)
+ resp = c.get('/logout', follow_redirects=True)
with self.client as c:
- rv = self.client.post('/login', data=dict(codename='invalid'),
+ resp = self.client.post('/login', data=dict(codename='invalid'),
follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- self.assertIn('Sorry, that is not a recognized codename.', rv.data)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn('Sorry, that is not a recognized codename.', resp.data)
self.assertNotIn('logged_in', session)
with self.client as c:
- rv = c.post('/login', data=dict(codename=codename),
+ resp = c.post('/login', data=dict(codename=codename),
follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
+ self.assertEqual(resp.status_code, 200)
self.assertTrue(session['logged_in'])
- rv = c.get('/logout', follow_redirects=True)
+ resp = c.get('/logout', follow_redirects=True)
self.assertTrue(not session)
- self.assertIn('Thank you for logging out.', rv.data)
+ self.assertIn('Thank you for logging out.', resp.data)
def test_login_with_whitespace(self):
"""Test that codenames with leading or trailing whitespace still work"""
def login_test(codename):
- rv = self.client.get('/login')
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Login to check for responses", rv.data)
+ resp = self.client.get('/login')
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Login to check for responses", resp.data)
with self.client as c:
- rv = c.post('/login', data=dict(codename=codename),
+ resp = c.post('/login', data=dict(codename=codename),
follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Submit documents and messages", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Submit documents and messages", resp.data)
self.assertTrue(session['logged_in'])
- common.logout(c)
+ resp = c.get('/logout', follow_redirects=True)
codename = self._new_codename()
login_test(codename + ' ')
@@ -188,29 +190,29 @@ def test_initial_submission_notification(self):
reminding sources to check back later for replies.
"""
self._new_codename()
- rv = self._dummy_submission()
- self.assertEqual(rv.status_code, 200)
+ resp = self._dummy_submission()
+ self.assertEqual(resp.status_code, 200)
self.assertIn(
"Thanks for submitting something to SecureDrop! Please check back later for replies.",
- rv.data)
+ resp.data)
def test_submit_message(self):
self._new_codename()
self._dummy_submission()
- rv = self.client.post('/submit', data=dict(
+ resp = self.client.post('/submit', data=dict(
msg="This is a test.",
fh=(StringIO(''), ''),
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Thanks! We received your message.", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Thanks! We received your message.", resp.data)
def test_submit_empty_message(self):
self._new_codename()
- rv = self.client.post('/submit', data=dict(
+ resp = self.client.post('/submit', data=dict(
msg="",
fh=(StringIO(''), ''),
), follow_redirects=True)
- self.assertIn("You must enter a message or choose a file to submit.", rv.data)
+ self.assertIn("You must enter a message or choose a file to submit.", resp.data)
def test_submit_big_message(self):
'''
@@ -220,43 +222,43 @@ def test_submit_big_message(self):
'''
self._new_codename()
self._dummy_submission()
- rv = self.client.post('/submit', data=dict(
+ resp = self.client.post('/submit', data=dict(
msg="AA" * (1024 * 512),
fh=(StringIO(''), ''),
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Thanks! We received your message.", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Thanks! We received your message.", resp.data)
def test_submit_file(self):
self._new_codename()
self._dummy_submission()
- rv = self.client.post('/submit', data=dict(
+ resp = self.client.post('/submit', data=dict(
msg="",
fh=(StringIO('This is a test'), 'test.txt'),
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
+ self.assertEqual(resp.status_code, 200)
self.assertIn(
escape(
'{} "{}"'.format(
"Thanks! We received your document",
"test.txt")),
- rv.data)
+ resp.data)
def test_submit_both(self):
self._new_codename()
self._dummy_submission()
- rv = self.client.post('/submit', data=dict(
+ resp = self.client.post('/submit', data=dict(
msg="This is a test",
fh=(StringIO('This is a test'), 'test.txt'),
), follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Thanks! We received your message.", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Thanks! We received your message.", resp.data)
self.assertIn(
escape(
'{} "{}"'.format(
"Thanks! We received your document",
'test.txt')),
- rv.data)
+ resp.data)
@patch('gzip.GzipFile')
def test_submit_sanitizes_filename(self, gzipfile):
@@ -274,24 +276,24 @@ def test_submit_sanitizes_filename(self, gzipfile):
fileobj=ANY)
def test_tor2web_warning_headers(self):
- rv = self.client.get('/', headers=[('X-tor2web', 'encrypted')])
- self.assertEqual(rv.status_code, 200)
- self.assertIn("You appear to be using Tor2Web.", rv.data)
+ resp = self.client.get('/', headers=[('X-tor2web', 'encrypted')])
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("You appear to be using Tor2Web.", resp.data)
def test_tor2web_warning(self):
- rv = self.client.get('/tor2web-warning')
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Why is there a warning about Tor2Web?", rv.data)
+ resp = self.client.get('/tor2web-warning')
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Why is there a warning about Tor2Web?", resp.data)
def test_why_journalist_key(self):
- rv = self.client.get('/why-journalist-key')
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Why download the journalist's public key?", rv.data)
+ resp = self.client.get('/why-journalist-key')
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Why download the journalist's public key?", resp.data)
def test_howto_disable_js(self):
- rv = self.client.get('/howto-disable-js')
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Disable JavaScript to Protect Your Anonymity", rv.data)
+ resp = self.client.get('/howto-disable-js')
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Disable JavaScript to Protect Your Anonymity", resp.data)
@patch('crypto_util.hash_codename')
def test_login_with_overly_long_codename(self, mock_hash_codename):
@@ -299,12 +301,12 @@ def test_login_with_overly_long_codename(self, mock_hash_codename):
an error, and scrypt should not be called to avoid DoS."""
overly_long_codename = 'a' * (Source.MAX_CODENAME_LEN + 1)
with self.client as client:
- rv = client.post(
+ resp = client.post(
'/login',
data=dict(codename=overly_long_codename),
follow_redirects=True)
- self.assertEqual(rv.status_code, 200)
- self.assertIn("Sorry, that is not a recognized codename.", rv.data)
+ self.assertEqual(resp.status_code, 200)
+ self.assertIn("Sorry, that is not a recognized codename.", resp.data)
self.assertFalse(mock_hash_codename.called,
"Called hash_codename for codename w/ invalid length")
diff --git a/securedrop/tests/test_unit_store.py b/securedrop/tests/test_unit_store.py
index fc8f66cc3fe..8b35b8483a0 100644
--- a/securedrop/tests/test_unit_store.py
+++ b/securedrop/tests/test_unit_store.py
@@ -1,15 +1,18 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+
import os
import unittest
import zipfile
+
# Set environment variable so config.py uses a test environment
os.environ['SECUREDROP_ENV'] = 'test'
+
+import crypto_util
+from common import SetUp, TearDown, TestSource, TestSubmission
import config
-import store
-import common
from db import db_session, Source
-import crypto_util
+import store
class TestStore(unittest.TestCase):
@@ -17,10 +20,11 @@ class TestStore(unittest.TestCase):
"""The set of tests for store.py."""
def setUp(self):
- common.shared_setup()
+ SetUp.setup()
def tearDown(self):
- common.shared_teardown()
+ TearDown.teardown()
+ db_session.remove()
def test_verify(self):
with self.assertRaises(store.PathException):
@@ -29,16 +33,14 @@ def test_verify(self):
store.verify(config.STORE_DIR + "_backup")
def test_get_zip(self):
- sid = 'EQZGCJBRGISGOTC2NZVWG6LILJBHEV3CINNEWSCLLFTUWZJPKJFECLS2NZ4G4U3QOZCFKTTPNZMVIWDCJBBHMUDBGFHXCQ3R'
- source = Source(sid, crypto_util.display_id())
- db_session.add(source)
- db_session.commit()
-
- files = ['1-abc1-msg.gpg', '2-abc2-msg.gpg']
- filenames = common.setup_test_docs(sid, files)
+ source, _ = TestSource.init_source()
+ submissions = TestSubmission.submit(source, 2)
+ filenames = [os.path.join(config.STORE_DIR,
+ source.filesystem_id,
+ submission.filename)
+ for submission in submissions]
archive = zipfile.ZipFile(store.get_bulk_archive(filenames))
-
archivefile_contents = archive.namelist()
for archived_file, actual_file in zip(archivefile_contents, filenames):
@@ -47,19 +49,14 @@ def test_get_zip(self):
self.assertEquals(zipped_file_content, actual_file_content)
def test_rename_valid_submission(self):
- sid = 'EQZGCJBRGISGOTC2NZVWG6LILJBHEV3CINNEWSCLLFTUWZJPKJFECLS2NZ4G4U3QOZCFKTTPNZMVIWDCJBBHMUDBGFHXCQ3R'
-
- source_dir = os.path.join(config.STORE_DIR, sid)
- if not os.path.exists(source_dir):
- os.makedirs(source_dir)
-
- old_filename = '1-abc1-msg.gpg'
- open(os.path.join(source_dir, old_filename), 'w').close()
-
- new_filestem = 'abc2'
- expected_filename = '1-abc2-msg.gpg'
- actual_filename = store.rename_submission(sid, old_filename,
- new_filestem)
+ source, _ = TestSource.init_source()
+ old_journalist_filename = source.journalist_filename
+ old_filename = TestSubmission.submit(source, 1)[0].filename
+ new_journalist_filename = 'nestor_makhno'
+ expected_filename = old_filename.replace(old_journalist_filename,
+ new_journalist_filename)
+ actual_filename = store.rename_submission(source.filesystem_id, old_filename,
+ new_journalist_filename)
self.assertEquals(actual_filename, expected_filename)
if __name__ == "__main__":