Skip to content

Commit

Permalink
Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Nov 29, 2024
1 parent 59628f2 commit 98a00ee
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 21 deletions.
10 changes: 5 additions & 5 deletions crates/partition-store/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ fn delete_invocation_status<S: StorageAccess>(storage: &mut S, invocation_id: &I
storage.delete_key(&create_invocation_status_key(invocation_id));
}

fn invoked_invocations<S: StorageAccess>(
fn invoked_or_killed_invocations<S: StorageAccess>(
storage: &mut S,
partition_key_range: RangeInclusive<PartitionKey>,
) -> Vec<Result<InvokedOrKilledInvocationStatusLite>> {
Expand All @@ -185,7 +185,7 @@ fn invoked_invocations<S: StorageAccess>(
invocations.extend(storage.for_each_key_value_in_place(
FullScanPartitionKeyRange::<InvocationStatusKey>(partition_key_range),
|mut k, mut v| {
let result = read_invoked_full_invocation_id(&mut k, &mut v).transpose();
let result = read_invoked_or_killed_status_lite(&mut k, &mut v).transpose();
if let Some(res) = result {
TableScanIterationDecision::Emit(res)
} else {
Expand Down Expand Up @@ -254,7 +254,7 @@ fn read_invoked_v1_full_invocation_id(
}
}

fn read_invoked_full_invocation_id(
fn read_invoked_or_killed_status_lite(
mut k: &mut &[u8],
v: &mut &[u8],
) -> Result<Option<InvokedOrKilledInvocationStatusLite>> {
Expand Down Expand Up @@ -291,7 +291,7 @@ impl ReadOnlyInvocationStatusTable for PartitionStore {
fn all_invoked_or_killed_invocations(
&mut self,
) -> impl Stream<Item = Result<InvokedOrKilledInvocationStatusLite>> + Send {
stream::iter(invoked_invocations(
stream::iter(invoked_or_killed_invocations(
self,
self.partition_key_range().clone(),
))
Expand All @@ -317,7 +317,7 @@ impl<'a> ReadOnlyInvocationStatusTable for PartitionStoreTransaction<'a> {
fn all_invoked_or_killed_invocations(
&mut self,
) -> impl Stream<Item = Result<InvokedOrKilledInvocationStatusLite>> + Send {
stream::iter(invoked_invocations(
stream::iter(invoked_or_killed_invocations(
self,
self.partition_key_range().clone(),
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async fn verify_point_lookups<T: InvocationStatusTable>(txn: &mut T) {
);
}

async fn verify_all_svc_with_status_invoked<T: InvocationStatusTable>(txn: &mut T) {
async fn verify_all_svc_with_status_invoked_or_killed<T: InvocationStatusTable>(txn: &mut T) {
let actual = txn
.all_invoked_or_killed_invocations()
.try_collect::<Vec<_>>()
Expand Down Expand Up @@ -207,7 +207,7 @@ async fn test_invocation_status() {
populate_data(&mut txn).await;

verify_point_lookups(&mut txn).await;
verify_all_svc_with_status_invoked(&mut txn).await;
verify_all_svc_with_status_invoked_or_killed(&mut txn).await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down
6 changes: 6 additions & 0 deletions crates/storage-api/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ pub mod v1 {
invocation_target: Some(invocation_target.into()),
source: Some(source.into()),
span_context: Some(span_context.into()),
// SAFETY: We're only mapping data types here
creation_time: unsafe { timestamps.creation_time() }.as_u64(),
modification_time: unsafe { timestamps.modification_time() }.as_u64(),
inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() }
Expand Down Expand Up @@ -592,6 +593,7 @@ pub mod v1 {
invocation_target: Some(invocation_target.into()),
source: Some(source.into()),
span_context: Some(span_context.into()),
// SAFETY: We're only mapping data types here
creation_time: unsafe { timestamps.creation_time() }.as_u64(),
modification_time: unsafe { timestamps.modification_time() }.as_u64(),
inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() }
Expand Down Expand Up @@ -647,6 +649,7 @@ pub mod v1 {
invocation_target: Some(invocation_target.into()),
source: Some(source.into()),
span_context: Some(journal_metadata.span_context.into()),
// SAFETY: We're only mapping data types here
creation_time: unsafe { timestamps.creation_time() }.as_u64(),
modification_time: unsafe { timestamps.modification_time() }.as_u64(),
inboxed_transition_time: unsafe {
Expand Down Expand Up @@ -711,6 +714,7 @@ pub mod v1 {
invocation_target: Some(invocation_target.into()),
source: Some(source.into()),
span_context: Some(journal_metadata.span_context.into()),
// SAFETY: We're only mapping data types here
creation_time: unsafe { timestamps.creation_time() }.as_u64(),
modification_time: unsafe { timestamps.modification_time() }.as_u64(),
inboxed_transition_time: unsafe {
Expand Down Expand Up @@ -775,6 +779,7 @@ pub mod v1 {
invocation_target: Some(invocation_target.into()),
source: Some(source.into()),
span_context: Some(journal_metadata.span_context.into()),
// SAFETY: We're only mapping data types here
creation_time: unsafe { timestamps.creation_time() }.as_u64(),
modification_time: unsafe { timestamps.modification_time() }.as_u64(),
inboxed_transition_time: unsafe {
Expand Down Expand Up @@ -827,6 +832,7 @@ pub mod v1 {
invocation_target: Some(invocation_target.into()),
source: Some(source.into()),
span_context: Some(span_context.into()),
// SAFETY: We're only mapping data types here
creation_time: unsafe { timestamps.creation_time() }.as_u64(),
modification_time: unsafe { timestamps.modification_time() }.as_u64(),
inboxed_transition_time: unsafe { timestamps.inboxed_transition_time() }
Expand Down
10 changes: 6 additions & 4 deletions crates/worker/src/partition/leadership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,16 +476,18 @@ where
.map_err(Error::Invoker)?;

{
let invoked_invocations = partition_store.all_invoked_or_killed_invocations();
tokio::pin!(invoked_invocations);
let invoked_or_killed_invocations = partition_store.all_invoked_or_killed_invocations();
tokio::pin!(invoked_or_killed_invocations);

let mut count = 0;
while let Some(invoked_invocation) = invoked_invocations.next().await {
while let Some(invoked_or_killed_invocation) =
invoked_or_killed_invocations.next().await
{
let InvokedOrKilledInvocationStatusLite {
invocation_id,
invocation_target,
is_invoked,
} = invoked_invocation?;
} = invoked_or_killed_invocation?;
if is_invoked {
invoker_handle
.invoke(
Expand Down
16 changes: 7 additions & 9 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,13 @@ where
let outbox_seq_number = partition_store.get_outbox_seq_number().await?;
let outbox_head_seq_number = partition_store.get_outbox_head_seq_number().await?;

let experimental_features = if disable_idempotency_table {
ExperimentalFeature::DisableIdempotencyTable.into()
} else {
EnumSet::empty()
} | if invocation_status_killed {
ExperimentalFeature::InvocationStatusKilled.into()
} else {
EnumSet::empty()
};
let mut experimental_features = EnumSet::empty();
if disable_idempotency_table {
experimental_features |= ExperimentalFeature::DisableIdempotencyTable;
}
if invocation_status_killed {
experimental_features |= ExperimentalFeature::InvocationStatusKilled;
}

let state_machine = StateMachine::new(
inbox_seq_number,
Expand Down
7 changes: 6 additions & 1 deletion crates/worker/src/partition/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
.await?
}
InvocationStatus::Killed(_) => {
trace!("Received kill command for an already killed invocation with id '{invocation_id}'.");
// Nothing to do here really, let's send again the abort signal to the invoker just in case
Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true);
}
Expand Down Expand Up @@ -988,6 +989,9 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
.await?
}
InvocationStatus::Killed(_) => {
trace!(
"Received cancel command for an already killed invocation '{invocation_id}'."
);
// Nothing to do here really, let's send again the abort signal to the invoker just in case
Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true);
}
Expand All @@ -1002,7 +1006,6 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
// This can happen because the invoke/resume and the abort invoker messages end up in different queues,
// and the abort message can overtake the invoke/resume.
// Consequently the invoker might have not received the abort and the user tried to send it again.
// TODO
Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false);
}
};
Expand Down Expand Up @@ -1717,6 +1720,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
invocation_id,
invocation_metadata.invocation_target.clone(),
invocation_metadata.journal_metadata.span_context.clone(),
// SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic.
unsafe { invocation_metadata.timestamps.creation_time() },
match &response_result {
ResponseResult::Success(_) => Ok(()),
Expand All @@ -1739,6 +1743,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
invocation_id,
invocation_target.clone(),
invocation_metadata.journal_metadata.span_context.clone(),
// SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic.
unsafe { invocation_metadata.timestamps.creation_time() },
Ok(()),
);
Expand Down

0 comments on commit 98a00ee

Please sign in to comment.