Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move metadata sync to api queue #640

Merged
merged 3 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion securedrop_client/api_jobs/downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.db import File, Message, Reply
from securedrop_client.storage import mark_as_decrypted, mark_as_downloaded, \
set_message_or_reply_content
set_message_or_reply_content, get_remote_data, update_local_storage

logger = logging.getLogger(__name__)

Expand All @@ -31,6 +31,49 @@ def __init__(self, message: str,
self.uuid = uuid


class MetadataSyncJob(ApiJob):
'''
Update source metadata such that new download jobs can be added to the queue.
'''

def __init__(self, data_dir: str, gpg: GpgHelper) -> None:
super().__init__()
self.data_dir = data_dir
self.gpg = gpg

def call_api(self, api_client: API, session: Session) -> Any:
'''
Override ApiJob.

Download new metadata, update the local database, import new keys, and
then the success signal will let the controller know to add any new download
jobs.
'''

remote_sources, remote_submissions, remote_replies = \
get_remote_data(api_client)

update_local_storage(session,
remote_sources,
remote_submissions,
remote_replies,
self.data_dir)

for source in remote_sources:
if source.key and source.key.get('type', None) == 'PGP':
pub_key = source.key.get('public', None)
fingerprint = source.key.get('fingerprint', None)
if not pub_key or not fingerprint:
# The below line needs to be excluded from the coverage computation
# as it will show as uncovered due to a cpython compiler optimziation.
# See: https://bugs.python.org/issue2506
continue # pragma: no cover
try:
self.gpg.import_key(source.uuid, pub_key, fingerprint)
except CryptoError:
logger.warning('Failed to import key for source {}'.format(source.uuid))


class DownloadJob(ApiJob):
'''
Download and decrypt a file that contains either a message, reply, or file submission.
Expand Down
45 changes: 14 additions & 31 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
from securedrop_client import storage
from securedrop_client import db
from securedrop_client.api_jobs.downloads import FileDownloadJob, MessageDownloadJob, \
ReplyDownloadJob, DownloadChecksumMismatchException
ReplyDownloadJob, DownloadChecksumMismatchException, MetadataSyncJob
from securedrop_client.api_jobs.uploads import SendReplyJob, SendReplyJobError, \
SendReplyJobTimeoutError
from securedrop_client.api_jobs.updatestar import UpdateStarJob, UpdateStarJobException
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.crypto import GpgHelper
from securedrop_client.export import Export
from securedrop_client.queue import ApiJobQueue
from securedrop_client.utils import check_dir_permissions
Expand Down Expand Up @@ -326,8 +326,8 @@ def on_authenticate_success(self, result):
self.session)
self.gui.show_main_window(user)
self.update_sources()
self.sync_api()
self.api_job_queue.login(self.api)
self.sync_api()
self.is_authenticated = True
self.resume_queues()

Expand Down Expand Up @@ -371,11 +371,14 @@ def sync_api(self):

if self.authenticated():
logger.debug("You are authenticated, going to make your call")
self.call_api(storage.get_remote_data,
self.on_sync_success,
self.on_sync_failure,
self.api)
logger.debug("In sync_api, after call to call_api, on "

job = MetadataSyncJob(self.data_dir, self.gpg)

job.success_signal.connect(self.on_sync_success, type=Qt.QueuedConnection)
job.failure_signal.connect(self.on_sync_failure, type=Qt.QueuedConnection)
self.api_job_queue.enqueue(job)

logger.debug("In sync_api, after call to submit job to queue, on "
"thread {}".format(self.thread().currentThreadId()))

def last_sync(self):
Expand All @@ -388,38 +391,18 @@ def last_sync(self):
except Exception:
return None

def on_sync_success(self, result) -> None:
def on_sync_success(self) -> None:
"""
Called when syncronisation of data via the API succeeds.
Called when syncronisation of data via the API queue succeeds.

* Update db with new metadata
* Set last sync flag
* Import keys into keyring
* Display the last sync time and updated list of sources in GUI
* Download new messages and replies
* Update missing files so that they can be re-downloaded
"""
remote_sources, remote_submissions, remote_replies = result
storage.update_local_storage(self.session,
remote_sources,
remote_submissions,
remote_replies,
self.data_dir)

with open(self.sync_flag, 'w') as f:
f.write(arrow.now().format())

for source in remote_sources:
if source.key and source.key.get('type', None) == 'PGP':
pub_key = source.key.get('public', None)
fingerprint = source.key.get('fingerprint', None)
if not pub_key or not fingerprint:
continue
try:
self.gpg.import_key(source.uuid, pub_key, fingerprint)
except CryptoError:
logger.warning('Failed to import key for source {}'.format(source.uuid))

storage.update_missing_files(self.data_dir, self.session)
self.update_sources()
self.download_new_messages()
Expand All @@ -428,7 +411,7 @@ def on_sync_success(self, result) -> None:

def on_sync_failure(self, result: Exception) -> None:
"""
Called when syncronisation of data via the API fails.
Called when syncronisation of data via the API queue fails.
"""
self.gui.update_error_status(
_('The SecureDrop server cannot be reached.'),
Expand Down
4 changes: 2 additions & 2 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from securedrop_client.api_jobs.base import ApiJob, ApiInaccessibleError, DEFAULT_NUM_ATTEMPTS, \
PauseQueueJob
from securedrop_client.api_jobs.downloads import (FileDownloadJob, MessageDownloadJob,
ReplyDownloadJob)
ReplyDownloadJob, MetadataSyncJob)
from securedrop_client.api_jobs.uploads import SendReplyJob
from securedrop_client.api_jobs.updatestar import UpdateStarJob

Expand Down Expand Up @@ -41,7 +41,7 @@ class RunnableQueue(QObject):
JOB_PRIORITIES = {
# TokenInvalidationJob: 10, # Not yet implemented
PauseQueueJob: 11,
# MetadataSyncJob: 12, # Not yet implemented
MetadataSyncJob: 12,
FileDownloadJob: 13, # File downloads processed in separate queue
MessageDownloadJob: 13,
ReplyDownloadJob: 13,
Expand Down
103 changes: 102 additions & 1 deletion tests/api_jobs/test_downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
from sdclientapi import Submission as SdkSubmission

from securedrop_client.api_jobs.downloads import DownloadJob, FileDownloadJob, MessageDownloadJob, \
ReplyDownloadJob, DownloadChecksumMismatchException
ReplyDownloadJob, DownloadChecksumMismatchException, MetadataSyncJob
from securedrop_client.crypto import GpgHelper, CryptoError
from tests import factory

with open(os.path.join(os.path.dirname(__file__), '..', 'files', 'test-key.gpg.pub.asc')) as f:
PUB_KEY = f.read()


def patch_decrypt(mocker, homedir, gpghelper, filename):
mock_decrypt = mocker.patch.object(gpghelper, 'decrypt_submission_or_reply')
Expand All @@ -18,6 +21,104 @@ def patch_decrypt(mocker, homedir, gpghelper, filename):
return mock_decrypt


def test_MetadataSyncJob_success(mocker, homedir, session, session_maker):
gpg = GpgHelper(homedir, session_maker, is_qubes=False)
job = MetadataSyncJob(homedir, gpg)

mock_source = mocker.MagicMock()
mock_source.uuid = 'bar'
mock_source.key = {
'type': 'PGP',
'public': PUB_KEY,
'fingerprint': '123456ABC',
}

mock_key_import = mocker.patch.object(job.gpg, 'import_key')
mock_get_remote_data = mocker.patch(
'securedrop_client.api_jobs.downloads.get_remote_data',
return_value=([mock_source], 'submissions', 'replies'))

api_client = 'foo'

mocker.patch(
'securedrop_client.api_jobs.downloads.update_local_storage',
return_value=([mock_source], 'submissions', 'replies'))

job.call_api(api_client, session)

assert mock_key_import.call_args[0][0] == mock_source.uuid
assert mock_key_import.call_args[0][1] == mock_source.key['public']
assert mock_key_import.call_args[0][2] == mock_source.key['fingerprint']
assert mock_get_remote_data.call_count == 1


def test_MetadataSyncJob_success_with_key_import_fail(mocker, homedir, session, session_maker):
"""
Check that we can gracefully handle a key import failure.
"""
gpg = GpgHelper(homedir, session_maker, is_qubes=False)
job = MetadataSyncJob(homedir, gpg)

mock_source = mocker.MagicMock()
mock_source.uuid = 'bar'
mock_source.key = {
'type': 'PGP',
'public': PUB_KEY,
'fingerprint': '123456ABC',
}

mock_key_import = mocker.patch.object(job.gpg, 'import_key',
side_effect=CryptoError)
mock_get_remote_data = mocker.patch(
'securedrop_client.api_jobs.downloads.get_remote_data',
return_value=([mock_source], 'submissions', 'replies'))

api_client = 'foo'

mocker.patch(
'securedrop_client.api_jobs.downloads.update_local_storage',
return_value=([mock_source], 'submissions', 'replies'))

job.call_api(api_client, session)

assert mock_key_import.call_args[0][0] == mock_source.uuid
assert mock_key_import.call_args[0][1] == mock_source.key['public']
assert mock_key_import.call_args[0][2] == mock_source.key['fingerprint']
assert mock_get_remote_data.call_count == 1


def test_MetadataSyncJob_success_with_missing_key(mocker, homedir, session, session_maker):
"""
Check that we can gracefully handle missing source keys.
"""
gpg = GpgHelper(homedir, session_maker, is_qubes=False)
job = MetadataSyncJob(homedir, gpg)

mock_source = mocker.MagicMock()
mock_source.uuid = 'bar'
mock_source.key = {
'type': 'PGP',
'pub_key': '',
'fingerprint': ''
}

mock_key_import = mocker.patch.object(job.gpg, 'import_key')
mock_get_remote_data = mocker.patch(
'securedrop_client.api_jobs.downloads.get_remote_data',
return_value=([mock_source], 'submissions', 'replies'))

api_client = 'foo'

mocker.patch(
'securedrop_client.api_jobs.downloads.update_local_storage',
return_value=([mock_source], 'submissions', 'replies'))

job.call_api(api_client, session)

assert mock_key_import.call_count == 0
assert mock_get_remote_data.call_count == 1


def test_MessageDownloadJob_raises_NotImplementedError(mocker):
job = DownloadJob('mock')

Expand Down
Loading