Skip to content

Commit

Permalink
Implement sys_get_call_invocation_id and sys_cancel_invocation
Browse files Browse the repository at this point in the history
Refactor header macro

New protocol
  • Loading branch information
slinkydeveloper committed Sep 27, 2024
1 parent 40afad2 commit 88ebe18
Show file tree
Hide file tree
Showing 11 changed files with 511 additions and 220 deletions.
39 changes: 38 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,21 @@ impl From<AsyncResultHandle> for u32 {
}
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct SendHandle(u32);

impl From<u32> for SendHandle {
fn from(value: u32) -> Self {
SendHandle(value)
}
}

impl From<SendHandle> for u32 {
fn from(value: SendHandle) -> Self {
value.0
}
}

#[derive(Debug, Eq, PartialEq)]
pub enum Value {
/// a void/None/undefined success
Expand All @@ -110,6 +125,8 @@ pub enum Value {
Failure(Failure),
/// Only returned for get_state_keys
StateKeys(Vec<String>),
/// Only returned for get_call_invocation_id
InvocationId(String),
CombinatorResult(Vec<AsyncResultHandle>),
}

Expand Down Expand Up @@ -159,6 +176,19 @@ impl From<NonEmptyValue> for Value {
}
}

#[derive(Debug, Eq, PartialEq)]
pub enum GetInvocationIdTarget {
CallEntry(AsyncResultHandle),
SendEntry(SendHandle),
}

#[derive(Debug, Eq, PartialEq)]
pub enum CancelInvocationTarget {
InvocationId(String),
CallEntry(AsyncResultHandle),
SendEntry(SendHandle),
}

#[derive(Debug, Eq, PartialEq)]
pub enum TakeOutputResult {
Buffer(Bytes),
Expand Down Expand Up @@ -242,7 +272,7 @@ pub trait VM: Sized {
target: Target,
input: Bytes,
execution_time_since_unix_epoch: Option<Duration>,
) -> VMResult<()>;
) -> VMResult<SendHandle>;

fn sys_awakeable(&mut self) -> VMResult<(String, AsyncResultHandle)>;

Expand All @@ -266,6 +296,13 @@ pub trait VM: Sized {
retry_policy: RetryPolicy,
) -> VMResult<AsyncResultHandle>;

fn sys_get_call_invocation_id(
&mut self,
call: GetInvocationIdTarget,
) -> VMResult<AsyncResultHandle>;

fn sys_cancel_invocation(&mut self, target: CancelInvocationTarget) -> VMResult<()>;

fn sys_write_output(&mut self, value: NonEmptyValue) -> VMResult<()>;

fn sys_end(&mut self) -> VMResult<()>;
Expand Down
52 changes: 52 additions & 0 deletions src/service_protocol/generated/dev.restate.service.protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,52 @@ pub mod run_entry_message {
Failure(super::Failure),
}
}
/// Completable: No
/// Fallible: Yes
/// Type: 0x0C00 + 6
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelInvocationEntryMessage {
/// Entry name
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
#[prost(oneof = "cancel_invocation_entry_message::Target", tags = "1, 2")]
pub target: ::core::option::Option<cancel_invocation_entry_message::Target>,
}
/// Nested message and enum types in `CancelInvocationEntryMessage`.
pub mod cancel_invocation_entry_message {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Target {
/// Target invocation id to cancel
#[prost(string, tag = "1")]
InvocationId(::prost::alloc::string::String),
/// Target index of the call/one way call journal entry in this journal.
#[prost(uint32, tag = "2")]
CallEntryIndex(u32),
}
}
/// Completable: Yes
/// Fallible: Yes
/// Type: 0x0C00 + 7
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetCallInvocationIdEntryMessage {
/// Index of the call/one way call journal entry in this journal.
#[prost(uint32, tag = "1")]
pub call_entry_index: u32,
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
#[prost(oneof = "get_call_invocation_id_entry_message::Result", tags = "14, 15")]
pub result: ::core::option::Option<get_call_invocation_id_entry_message::Result>,
}
/// Nested message and enum types in `GetCallInvocationIdEntryMessage`.
pub mod get_call_invocation_id_entry_message {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Result {
#[prost(string, tag = "14")]
Value(::prost::alloc::string::String),
#[prost(message, tag = "15")]
Failure(super::Failure),
}
}
/// This failure object carries user visible errors,
/// e.g. invocation failure return value or failure result of an InvokeEntryMessage.
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -501,6 +547,10 @@ pub enum ServiceProtocolVersion {
/// Added
/// * Entry retry mechanism: ErrorMessage.next_retry_delay, StartMessage.retry_count_since_last_stored_entry and StartMessage.duration_since_last_stored_entry
V2 = 2,
/// Added
/// * New entry to cancel invocations: CancelInvocationEntryMessage
/// * New entry to retrieve the invocation id: GetCallInvocationIdEntryMessage
V3 = 3,
}
impl ServiceProtocolVersion {
/// String value of the enum field names used in the ProtoBuf definition.
Expand All @@ -512,6 +562,7 @@ impl ServiceProtocolVersion {
Self::Unspecified => "SERVICE_PROTOCOL_VERSION_UNSPECIFIED",
Self::V1 => "V1",
Self::V2 => "V2",
Self::V3 => "V3",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
Expand All @@ -520,6 +571,7 @@ impl ServiceProtocolVersion {
"SERVICE_PROTOCOL_VERSION_UNSPECIFIED" => Some(Self::Unspecified),
"V1" => Some(Self::V1),
"V2" => Some(Self::V2),
"V3" => Some(Self::V3),
_ => None,
}
}
Expand Down
Loading

0 comments on commit 88ebe18

Please sign in to comment.