Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: only work with prefixed jobs #23

Merged
merged 4 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/upparat/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
JOB_REVOKED = "job-revoked"

SELECT_JOB_INTERRUPTED = "selected-job-interrupted"
SELECT_JOB_ACTION_MISMATCH = "selected-job-action-mismatch"

DOWNLOAD_COMPLETED = "download-completed"
DOWNLOAD_INTERRUPTED = "download-interrupted"
Expand Down
17 changes: 15 additions & 2 deletions src/upparat/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

from upparat.config import settings

UPPARAT_ACTION = "upparat-update"
# FIXME: Prefix jobs since there could be
# multiple services consuming AWS IoT Jobs.
# Revisit this once AWS roles out AWS IoT Job
# Namespaces which would be the "correct" solution.
UPPARAT_JOB_PREFIX = "upparat_"

# AWS job execution
EXECUTION = "execution"
Expand All @@ -16,7 +20,6 @@
# AWS job document
JOB_ID = "jobId"
JOB_DOCUMENT = "jobDocument"
JOB_DOCUMENT_ACTION = "action"
JOB_DOCUMENT_FILE = "file"
JOB_DOCUMENT_VERSION = "version"
JOB_DOCUMENT_META = "meta"
Expand Down Expand Up @@ -80,6 +83,10 @@ class JobProgressStatus(Enum):
ERROR_MULTIPLE_IN_PROGRESS = "error_multiple_in_progress"


def is_upparat_job_id(job_id):
return job_id.startswith(UPPARAT_JOB_PREFIX)


def jobs_base(thing_name):
return f"$aws/things/{thing_name}/jobs/"

Expand Down Expand Up @@ -131,6 +138,12 @@ def job_update(mqtt_client, thing_name, job_id, status, state, message=None):
)


def filter_upparat_job_exectutions(job_executions):
return [
job for job in job_executions if job["jobId"].startswith(UPPARAT_JOB_PREFIX)
]


def job_update_multiple_as_failed(
mqtt_client, thing_name, job_ids, state, message=None
):
Expand Down
11 changes: 9 additions & 2 deletions src/upparat/statemachine/fetch_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from upparat.events import MQTT_MESSAGE_RECEIVED
from upparat.events import MQTT_SUBSCRIBED
from upparat.events import NO_JOBS_PENDING
from upparat.jobs import filter_upparat_job_exectutions
from upparat.jobs import get_pending_job_executions
from upparat.jobs import get_pending_job_executions_response
from upparat.statemachine import BaseState
Expand Down Expand Up @@ -54,8 +55,14 @@ def on_message(self, state, event):

# Handle accepted pending jobs executions
if topic_matches_sub(self.get_pending_job_executions_response, topic):
in_progress_job_executions = payload.get(IN_PROGRESS_JOBS, [])
queued_job_executions = payload.get(QUEUED_JOBS, [])
in_progress_job_executions = filter_upparat_job_exectutions(
payload.get(IN_PROGRESS_JOBS, [])
)

queued_job_executions = filter_upparat_job_exectutions(
payload.get(QUEUED_JOBS, [])
)

# If there are jobs available go to prepare state
if in_progress_job_executions or queued_job_executions:
logger.debug("Job executions available.")
Expand Down
5 changes: 0 additions & 5 deletions src/upparat/statemachine/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from upparat.events import JOBS_AVAILABLE
from upparat.events import NO_JOBS_PENDING
from upparat.events import RESTART_INTERRUPTED
from upparat.events import SELECT_JOB_ACTION_MISMATCH
from upparat.events import SELECT_JOB_INTERRUPTED
from upparat.statemachine import UpparatStateMachine
from upparat.statemachine.download import DownloadState
Expand Down Expand Up @@ -72,10 +71,6 @@ def create_statemachine(event_queue, mqtt_client):
select_job_state, fetch_jobs_state, events=[SELECT_JOB_INTERRUPTED]
)

statemachine.add_transition(
select_job_state, monitor_state, events=[SELECT_JOB_ACTION_MISMATCH]
)

# Job is ready for process
statemachine.add_transition(verify_job_state, download_state, events=[JOB_VERIFIED])

Expand Down
11 changes: 9 additions & 2 deletions src/upparat/statemachine/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from upparat.events import MQTT_EVENT_PAYLOAD
from upparat.events import MQTT_EVENT_TOPIC
from upparat.events import MQTT_MESSAGE_RECEIVED
from upparat.jobs import filter_upparat_job_exectutions
from upparat.jobs import pending_jobs_response
from upparat.statemachine import BaseState

Expand Down Expand Up @@ -41,8 +42,14 @@ def on_message(self, state, event):

if topic_matches_sub(self.job_pending_response, topic):
payload = json.loads(event.cargo[MQTT_EVENT_PAYLOAD])
in_progress_job_executions = payload["jobs"].get(JOBS_IN_PROGRESS, [])
queued_job_executions = payload["jobs"].get(JOBS_QUEUED, [])

in_progress_job_executions = filter_upparat_job_exectutions(
payload["jobs"].get(JOBS_IN_PROGRESS, [])
)

queued_job_executions = filter_upparat_job_exectutions(
payload["jobs"].get(JOBS_QUEUED, [])
)

# If there are jobs available go to job selection state
if in_progress_job_executions or queued_job_executions:
Expand Down
12 changes: 0 additions & 12 deletions src/upparat/statemachine/select_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
from upparat.events import MQTT_EVENT_TOPIC
from upparat.events import MQTT_MESSAGE_RECEIVED
from upparat.events import MQTT_SUBSCRIBED
from upparat.events import SELECT_JOB_ACTION_MISMATCH
from upparat.events import SELECT_JOB_INTERRUPTED
from upparat.jobs import describe_job_execution
from upparat.jobs import describe_job_execution_response
from upparat.jobs import EXECUTION
from upparat.jobs import Job
from upparat.jobs import JOB_ACCEPTED
from upparat.jobs import JOB_DOCUMENT
from upparat.jobs import JOB_DOCUMENT_ACTION
from upparat.jobs import JOB_DOCUMENT_FILE
from upparat.jobs import JOB_DOCUMENT_FORCE
from upparat.jobs import JOB_DOCUMENT_META
Expand All @@ -34,7 +32,6 @@
from upparat.jobs import JOB_STATUS_DETAILS
from upparat.jobs import job_update_multiple_as_failed
from upparat.jobs import JobProgressStatus
from upparat.jobs import UPPARAT_ACTION
from upparat.statemachine import BaseState

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -131,15 +128,6 @@ def on_message(self, state, event):
if topic_matches_sub(accepted_topic, topic):
job_execution = payload[EXECUTION]
job_document = job_execution[JOB_DOCUMENT]
job_document_action = job_document.get(JOB_DOCUMENT_ACTION)

# something else can also publish jobs → make sure to only handle the ones for us
if job_document_action != UPPARAT_ACTION:
logger.info(
f"Job ignored: Job document does not match expected Upparat action field {UPPARAT_ACTION}." # noqa
)
self.publish(Event(SELECT_JOB_ACTION_MISMATCH))
return

job = Job(
id_=job_execution[JOB_ID],
Expand Down
4 changes: 3 additions & 1 deletion tests/statemachine/download_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import pytest

from ..utils import create_hook_event # noqa: F401
from ..utils import create_mqtt_message_event # noqa: F401
from ..utils import generate_random_job_id
from upparat.config import settings
from upparat.events import DOWNLOAD_COMPLETED
from upparat.events import DOWNLOAD_INTERRUPTED
Expand Down Expand Up @@ -55,7 +57,7 @@ def download_state(mocker, tmpdir):
state = DownloadState()

state.job = Job(
id_="424242",
id_=generate_random_job_id(),
status=JobStatus.IN_PROGRESS,
file_url="https://foo.bar/baz",
version="1.1.1",
Expand Down
33 changes: 28 additions & 5 deletions tests/statemachine/fetch_jobs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ..utils import create_mqtt_message_event # noqa: F401
from ..utils import create_mqtt_subscription_event # noqa: F401
from ..utils import generate_random_job_id
from upparat.config import settings
from upparat.events import JOBS_AVAILABLE
from upparat.events import MQTT_MESSAGE_RECEIVED
Expand All @@ -13,6 +14,16 @@
from upparat.statemachine import UpparatStateMachine
from upparat.statemachine.fetch_jobs import FetchJobsState

NON_UPPARAT_IN_PROGRESS_JOBS = [
{"jobId": "non_upparat_job_in_progress_1"},
{"jobId": "non_upparat_job_in_progress_2"},
]

NON_UPPARAT_QUEUED_JOBS = [
{"jobId": "non_upparat_job_queued_3"},
{"jobId": "non_upparat_job_queued_4"},
]


@pytest.fixture
def fetch_jobs_state(mocker):
Expand Down Expand Up @@ -63,7 +74,11 @@ def test_on_message_no_pending_jobs(fetch_jobs_state, create_mqtt_message_event)
state.on_enter(None, None)

topic = f"$aws/things/{settings.broker.thing_name}/jobs/get/+"
payload = {"queuedJobs": [], "inProgressJobs": []}

payload = {
"queuedJobs": NON_UPPARAT_QUEUED_JOBS,
"inProgressJobs": NON_UPPARAT_IN_PROGRESS_JOBS,
}

mqtt_message_event = create_mqtt_message_event(topic, payload)
state.on_message(None, mqtt_message_event)
Expand All @@ -79,9 +94,13 @@ def test_on_message_pending_queued_jobs(fetch_jobs_state, create_mqtt_message_ev
settings.broker.thing_name = "bobby"
state.on_enter(None, None)

queued_job = {"jobId": "42"}
queued_job = {"jobId": generate_random_job_id()}
topic = f"$aws/things/{settings.broker.thing_name}/jobs/get/+"
payload = {"queuedJobs": [queued_job], "inProgressJobs": []}

payload = {
"queuedJobs": NON_UPPARAT_QUEUED_JOBS + [queued_job],
"inProgressJobs": NON_UPPARAT_IN_PROGRESS_JOBS,
}

mqtt_message_event = create_mqtt_message_event(topic, payload)
state.on_message(None, mqtt_message_event)
Expand All @@ -104,9 +123,13 @@ def test_on_message_pending_progress_jobs(fetch_jobs_state, create_mqtt_message_
settings.broker.thing_name = "bobby"
state.on_enter(None, None)

progress_job = {"jobId": "42"}
progress_job = {"jobId": generate_random_job_id()}
topic = f"$aws/things/{settings.broker.thing_name}/jobs/get/+"
payload = {"queuedJobs": [], "inProgressJobs": [progress_job]}

payload = {
"queuedJobs": NON_UPPARAT_QUEUED_JOBS,
"inProgressJobs": NON_UPPARAT_IN_PROGRESS_JOBS + [progress_job],
}

mqtt_message_event = create_mqtt_message_event(topic, payload)
state.on_message(None, mqtt_message_event)
Expand Down
3 changes: 2 additions & 1 deletion tests/statemachine/install_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from ..utils import create_hook_event # noqa: F401
from ..utils import generate_random_job_id
from upparat.config import settings
from upparat.events import HOOK
from upparat.events import HOOK_STATUS_COMPLETED
Expand All @@ -20,7 +21,7 @@
from upparat.statemachine.install import InstallState

JOB_ = Job(
"42",
generate_random_job_id(),
JobStatus.IN_PROGRESS,
"http://foo.bar/baz.bin",
"1.0.0",
Expand Down
31 changes: 27 additions & 4 deletions tests/statemachine/monitor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@

from ..utils import create_mqtt_message_event # noqa: F401
from ..utils import create_mqtt_subscription_event # noqa: F401
from ..utils import generate_random_job_id
from upparat.config import settings
from upparat.events import JOBS_AVAILABLE
from upparat.events import MQTT_MESSAGE_RECEIVED
from upparat.statemachine import UpparatStateMachine
from upparat.statemachine.monitor import MonitorState

NON_UPPARAT_IN_PROGRESS_JOBS = [
{"jobId": "non_upparat_job_in_progress_1"},
{"jobId": "non_upparat_job_in_progress_2"},
]

NON_UPPARAT_QUEUED_JOBS = [
{"jobId": "non_upparat_job_queued_3"},
{"jobId": "non_upparat_job_queued_4"},
]


@pytest.fixture
def monitor_state(mocker):
Expand Down Expand Up @@ -72,9 +83,15 @@ def test_on_message_pending_queued_jobs(monitor_state, create_mqtt_message_event
settings.broker.thing_name = "bobby"
state.on_enter(None, None)

queued_job = {"jobId": "42"}
queued_job = {"jobId": generate_random_job_id()}
topic = f"$aws/things/{settings.broker.thing_name}/jobs/notify"
payload = {"jobs": {"IN_PROGRESS": [], "QUEUED": [queued_job]}}

payload = {
"jobs": {
"IN_PROGRESS": NON_UPPARAT_IN_PROGRESS_JOBS,
"QUEUED": NON_UPPARAT_QUEUED_JOBS + [queued_job],
}
}

mqtt_message_event = create_mqtt_message_event(topic, payload)
state.on_message(None, mqtt_message_event)
Expand All @@ -97,9 +114,15 @@ def test_on_message_pending_progress_jobs(monitor_state, create_mqtt_message_eve
settings.broker.thing_name = "bobby"
state.on_enter(None, None)

progress_job = {"jobId": "42"}
progress_job = {"jobId": generate_random_job_id()}
topic = f"$aws/things/{settings.broker.thing_name}/jobs/notify"
payload = {"jobs": {"IN_PROGRESS": [progress_job], "QUEUED": []}}

payload = {
"jobs": {
"IN_PROGRESS": NON_UPPARAT_IN_PROGRESS_JOBS + [progress_job],
"QUEUED": NON_UPPARAT_QUEUED_JOBS,
}
}

mqtt_message_event = create_mqtt_message_event(topic, payload)
state.on_message(None, mqtt_message_event)
Expand Down
3 changes: 2 additions & 1 deletion tests/statemachine/restart_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from ..utils import create_hook_event # noqa: F401
from ..utils import generate_random_job_id
from upparat.config import settings
from upparat.events import HOOK
from upparat.events import HOOK_STATUS_COMPLETED
Expand All @@ -19,7 +20,7 @@
from upparat.statemachine.restart import RestartState

JOB_ = Job(
"42",
generate_random_job_id(),
JobStatus.IN_PROGRESS,
"http://foo.bar/baz.bin",
"1.0.0",
Expand Down
Loading