From 98a00ee7965415c2859537e22e21b38c0d84370f Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 29 Nov 2024 19:23:55 +0100 Subject: [PATCH] Feedback --- .../src/invocation_status_table/mod.rs | 10 +++++----- .../tests/invocation_status_table_test/mod.rs | 4 ++-- crates/storage-api/src/storage.rs | 6 ++++++ crates/worker/src/partition/leadership.rs | 10 ++++++---- crates/worker/src/partition/mod.rs | 16 +++++++--------- crates/worker/src/partition/state_machine/mod.rs | 7 ++++++- 6 files changed, 32 insertions(+), 21 deletions(-) diff --git a/crates/partition-store/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs index ae6cdd19a4..6be2332df2 100644 --- a/crates/partition-store/src/invocation_status_table/mod.rs +++ b/crates/partition-store/src/invocation_status_table/mod.rs @@ -166,7 +166,7 @@ fn delete_invocation_status(storage: &mut S, invocation_id: &I storage.delete_key(&create_invocation_status_key(invocation_id)); } -fn invoked_invocations( +fn invoked_or_killed_invocations( storage: &mut S, partition_key_range: RangeInclusive, ) -> Vec> { @@ -185,7 +185,7 @@ fn invoked_invocations( invocations.extend(storage.for_each_key_value_in_place( FullScanPartitionKeyRange::(partition_key_range), |mut k, mut v| { - let result = read_invoked_full_invocation_id(&mut k, &mut v).transpose(); + let result = read_invoked_or_killed_status_lite(&mut k, &mut v).transpose(); if let Some(res) = result { TableScanIterationDecision::Emit(res) } else { @@ -254,7 +254,7 @@ fn read_invoked_v1_full_invocation_id( } } -fn read_invoked_full_invocation_id( +fn read_invoked_or_killed_status_lite( mut k: &mut &[u8], v: &mut &[u8], ) -> Result> { @@ -291,7 +291,7 @@ impl ReadOnlyInvocationStatusTable for PartitionStore { fn all_invoked_or_killed_invocations( &mut self, ) -> impl Stream> + Send { - stream::iter(invoked_invocations( + stream::iter(invoked_or_killed_invocations( self, self.partition_key_range().clone(), )) @@ -317,7 +317,7 @@ impl<'a> ReadOnlyInvocationStatusTable for PartitionStoreTransaction<'a> { fn all_invoked_or_killed_invocations( &mut self, ) -> impl Stream> + Send { - stream::iter(invoked_invocations( + stream::iter(invoked_or_killed_invocations( self, self.partition_key_range().clone(), )) diff --git a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs index 442a9ce3ba..dad3b0d5d2 100644 --- a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs +++ b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs @@ -172,7 +172,7 @@ async fn verify_point_lookups(txn: &mut T) { ); } -async fn verify_all_svc_with_status_invoked(txn: &mut T) { +async fn verify_all_svc_with_status_invoked_or_killed(txn: &mut T) { let actual = txn .all_invoked_or_killed_invocations() .try_collect::>() @@ -207,7 +207,7 @@ async fn test_invocation_status() { populate_data(&mut txn).await; verify_point_lookups(&mut txn).await; - verify_all_svc_with_status_invoked(&mut txn).await; + verify_all_svc_with_status_invoked_or_killed(&mut txn).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index 4fce52acf4..485ce7cf30 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -540,6 +540,7 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(span_context.into()), + // SAFETY: We're only mapping data types here creation_time: unsafe { timestamps.creation_time() }.as_u64(), modification_time: unsafe { timestamps.modification_time() }.as_u64(), inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() } @@ -592,6 +593,7 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(span_context.into()), + // SAFETY: We're only mapping data types here creation_time: unsafe { timestamps.creation_time() }.as_u64(), modification_time: unsafe { timestamps.modification_time() }.as_u64(), inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() } @@ -647,6 +649,7 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(journal_metadata.span_context.into()), + // SAFETY: We're only mapping data types here creation_time: unsafe { timestamps.creation_time() }.as_u64(), modification_time: unsafe { timestamps.modification_time() }.as_u64(), inboxed_transition_time: unsafe { @@ -711,6 +714,7 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(journal_metadata.span_context.into()), + // SAFETY: We're only mapping data types here creation_time: unsafe { timestamps.creation_time() }.as_u64(), modification_time: unsafe { timestamps.modification_time() }.as_u64(), inboxed_transition_time: unsafe { @@ -775,6 +779,7 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(journal_metadata.span_context.into()), + // SAFETY: We're only mapping data types here creation_time: unsafe { timestamps.creation_time() }.as_u64(), modification_time: unsafe { timestamps.modification_time() }.as_u64(), inboxed_transition_time: unsafe { @@ -827,6 +832,7 @@ pub mod v1 { invocation_target: Some(invocation_target.into()), source: Some(source.into()), span_context: Some(span_context.into()), + // SAFETY: We're only mapping data types here creation_time: unsafe { timestamps.creation_time() }.as_u64(), modification_time: unsafe { timestamps.modification_time() }.as_u64(), inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() } diff --git a/crates/worker/src/partition/leadership.rs b/crates/worker/src/partition/leadership.rs index d8863c1391..45bfd221c5 100644 --- a/crates/worker/src/partition/leadership.rs +++ b/crates/worker/src/partition/leadership.rs @@ -476,16 +476,18 @@ where .map_err(Error::Invoker)?; { - let invoked_invocations = partition_store.all_invoked_or_killed_invocations(); - tokio::pin!(invoked_invocations); + let invoked_or_killed_invocations = partition_store.all_invoked_or_killed_invocations(); + tokio::pin!(invoked_or_killed_invocations); let mut count = 0; - while let Some(invoked_invocation) = invoked_invocations.next().await { + while let Some(invoked_or_killed_invocation) = + invoked_or_killed_invocations.next().await + { let InvokedOrKilledInvocationStatusLite { invocation_id, invocation_target, is_invoked, - } = invoked_invocation?; + } = invoked_or_killed_invocation?; if is_invoked { invoker_handle .invoke( diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index dbcbfb684a..08e6e98f63 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -226,15 +226,13 @@ where let outbox_seq_number = partition_store.get_outbox_seq_number().await?; let outbox_head_seq_number = partition_store.get_outbox_head_seq_number().await?; - let experimental_features = if disable_idempotency_table { - ExperimentalFeature::DisableIdempotencyTable.into() - } else { - EnumSet::empty() - } | if invocation_status_killed { - ExperimentalFeature::InvocationStatusKilled.into() - } else { - EnumSet::empty() - }; + let mut experimental_features = EnumSet::empty(); + if disable_idempotency_table { + experimental_features |= ExperimentalFeature::DisableIdempotencyTable; + } + if invocation_status_killed { + experimental_features |= ExperimentalFeature::InvocationStatusKilled; + } let state_machine = StateMachine::new( inbox_seq_number, diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 4d232e060f..4e695e682a 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -906,6 +906,7 @@ impl StateMachine { .await? } InvocationStatus::Killed(_) => { + trace!("Received kill command for an already killed invocation with id '{invocation_id}'."); // Nothing to do here really, let's send again the abort signal to the invoker just in case Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); } @@ -988,6 +989,9 @@ impl StateMachine { .await? } InvocationStatus::Killed(_) => { + trace!( + "Received cancel command for an already killed invocation '{invocation_id}'." + ); // Nothing to do here really, let's send again the abort signal to the invoker just in case Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); } @@ -1002,7 +1006,6 @@ impl StateMachine { // This can happen because the invoke/resume and the abort invoker messages end up in different queues, // and the abort message can overtake the invoke/resume. // Consequently the invoker might have not received the abort and the user tried to send it again. - // TODO Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); } }; @@ -1717,6 +1720,7 @@ impl StateMachine { invocation_id, invocation_metadata.invocation_target.clone(), invocation_metadata.journal_metadata.span_context.clone(), + // SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic. unsafe { invocation_metadata.timestamps.creation_time() }, match &response_result { ResponseResult::Success(_) => Ok(()), @@ -1739,6 +1743,7 @@ impl StateMachine { invocation_id, invocation_target.clone(), invocation_metadata.journal_metadata.span_context.clone(), + // SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic. unsafe { invocation_metadata.timestamps.creation_time() }, Ok(()), );