Skip to content

Commit

Permalink
Checkpoint COPS with version worker
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Mar 30, 2024
1 parent 6704acb commit dca0154
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 76 deletions.
110 changes: 71 additions & 39 deletions src/cops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
event::{erased::OnEvent, SendEvent, Timer},
message::Payload,
net::{events::Recv, SendMessage},
worker::erased::Worker,
};

// "key" under COPS context, "id" under Boson's logical clock context
Expand All @@ -33,13 +34,13 @@ pub struct GetOk<V> {
version: V,
}

pub struct Sync<V, A> {
pub struct SyncKey<V, A> {
key: KeyId,
version: V,
server_addr: A,
}

pub struct SyncOk<V> {
pub struct SyncKeyOk<V> {
key: KeyId,
value: Payload,
version: V,
Expand All @@ -51,46 +52,52 @@ impl<T: SendMessage<A, GetOk<V>> + SendMessage<A, PutOk<V>>, A, V> ClientNet<A,
pub trait ServerNet<A, V>:
SendMessage<A, Put<V, A>>
+ SendMessage<A, Get<A>>
+ SendMessage<A, Sync<V, A>>
+ SendMessage<A, SyncOk<V>>
+ SendMessage<A, SyncKey<V, A>>
+ SendMessage<A, SyncKeyOk<V>>
{
}
impl<
T: SendMessage<A, Put<V, A>>
+ SendMessage<A, Get<A>>
+ SendMessage<A, Sync<V, A>>
+ SendMessage<A, SyncOk<V>>,
+ SendMessage<A, SyncKey<V, A>>
+ SendMessage<A, SyncKeyOk<V>>,
A,
V,
> ServerNet<A, V> for T
{
}

pub struct CreateVersion<V> {
pub id: KeyId,
pub previous: Option<V>,
pub deps: Vec<V>,
pub trait VersionService {
type Version;
fn merge_and_increment_once(
&self,
id: KeyId,
previous: Option<Self::Version>,
deps: Vec<Self::Version>,
) -> anyhow::Result<Self::Version>;
}

pub struct CreateVersionOk<V>(pub KeyId, pub V);
pub trait Version: PartialOrd + Clone + Send + Sync + 'static {}
impl<T: PartialOrd + Clone + Send + Sync + 'static> Version for T {}

pub struct VersionOk<V>(pub KeyId, pub V);

// pub struct Client<V> {
// }

pub trait VersionService<V>: SendEvent<CreateVersion<V>> {}
impl<T: SendEvent<CreateVersion<V>>, V> VersionService<V> for T {}

pub struct Server<N, CN, S, V, A> {
pub struct Server<N, CN, VS, E, V, A> {
store: BTreeMap<KeyId, (Payload, V)>,
version_zero: V,
pending_puts: Vec<Put<V, A>>,
pending_gets: Vec<Get<A>>,
net: N,
client_net: CN,
version_service: S,
version_worker: Worker<VS, E>,
}

impl<N, CN: ClientNet<A, V>, A, V: Clone, S> OnEvent<Recv<Get<A>>> for Server<N, CN, S, V, A> {
impl<N, CN: ClientNet<A, V>, A, V: Clone, VS, E> OnEvent<Recv<Get<A>>>
for Server<N, CN, VS, E, V, A>
{
fn on_event(&mut self, Recv(get): Recv<Get<A>>, _: &mut impl Timer) -> anyhow::Result<()> {
if let Some((value, version)) = self.store.get(&get.key) {
let get_ok = GetOk {
Expand All @@ -105,8 +112,14 @@ impl<N, CN: ClientNet<A, V>, A, V: Clone, S> OnEvent<Recv<Get<A>>> for Server<N,
}
}

impl<N: ServerNet<A, V>, CN: ClientNet<A, V>, A, V: PartialOrd + Clone, S: VersionService<V>>
OnEvent<Recv<Put<V, A>>> for Server<N, CN, S, V, A>
impl<
N: ServerNet<A, V>,
CN: ClientNet<A, V>,
A,
V: Version,
VS: VersionService<Version = V>,
E: SendEvent<VersionOk<V>>,
> OnEvent<Recv<Put<V, A>>> for Server<N, CN, VS, E, V, A>
{
fn on_event(&mut self, Recv(put): Recv<Put<V, A>>, _: &mut impl Timer) -> anyhow::Result<()> {
if let Some((_, version)) = self.store.get(&put.key) {
Expand Down Expand Up @@ -153,25 +166,29 @@ impl<N: ServerNet<A, V>, CN: ClientNet<A, V>, A, V: PartialOrd + Clone, S: Versi
};
return self.client_net.send(put.client_addr, put_ok);
}
self.version_service.send(CreateVersion {
id: put.key,
previous,
deps,
})?;
self.version_worker
.submit(Box::new(move |service, sender| {
let version = service.merge_and_increment_once(put.key, previous, deps)?;
sender.send(VersionOk(put.key, version))
}))?;
}
// TODO figure out who to sync with and send Sync
self.pending_puts.push(put);
Ok(())
}
}

impl<N: ServerNet<A, V>, CN, A, V: PartialOrd + Clone, S> OnEvent<Recv<Sync<V, A>>>
for Server<N, CN, S, V, A>
impl<N: ServerNet<A, V>, CN, A, V: PartialOrd + Clone, VS, E> OnEvent<Recv<SyncKey<V, A>>>
for Server<N, CN, VS, E, V, A>
{
fn on_event(&mut self, Recv(sync): Recv<Sync<V, A>>, _: &mut impl Timer) -> anyhow::Result<()> {
fn on_event(
&mut self,
Recv(sync): Recv<SyncKey<V, A>>,
_: &mut impl Timer,
) -> anyhow::Result<()> {
if let Some((value, version)) = self.store.get(&sync.key) {
if matches!(version.partial_cmp(&sync.version), Some(Greater | Equal)) {
let sync_ok = SyncOk {
let sync_ok = SyncKeyOk {
key: sync.key,
value: value.clone(),
version: version.clone(),
Expand All @@ -183,12 +200,18 @@ impl<N: ServerNet<A, V>, CN, A, V: PartialOrd + Clone, S> OnEvent<Recv<Sync<V, A
}
}

impl<N, CN: ClientNet<A, V>, A, V: PartialOrd + Clone, S: VersionService<V>>
OnEvent<Recv<SyncOk<V>>> for Server<N, CN, S, V, A>
impl<
N,
CN: ClientNet<A, V>,
A,
V: Version,
VS: VersionService<Version = V>,
E: SendEvent<VersionOk<V>>,
> OnEvent<Recv<SyncKeyOk<V>>> for Server<N, CN, VS, E, V, A>
{
fn on_event(
&mut self,
Recv(sync_ok): Recv<SyncOk<V>>,
Recv(sync_ok): Recv<SyncKeyOk<V>>,
_: &mut impl Timer,
) -> anyhow::Result<()> {
if let Some((_, version)) = self.store.get(&sync_ok.key) {
Expand Down Expand Up @@ -223,24 +246,33 @@ impl<N, CN: ClientNet<A, V>, A, V: PartialOrd + Clone, S: VersionService<V>>
false
}
}) {
self.version_service.send(CreateVersion {
id: put.key,
previous: self.store.get(&put.key).map(|(_, version)| version.clone()),
deps: put.deps.values().cloned().collect(),
})?
let id = put.key;
let previous = self.store.get(&put.key).map(|(_, version)| version.clone());
let deps = put.deps.values().cloned().collect();
self.version_worker
.submit(Box::new(move |service, sender| {
let version = service.merge_and_increment_once(id, previous, deps)?;
sender.send(VersionOk(id, version))
}))?
}
}

Ok(())
}
}

impl<N, CN: ClientNet<A, V>, A, V: PartialOrd + Clone, S: VersionService<V>>
OnEvent<CreateVersionOk<V>> for Server<N, CN, S, V, A>
impl<
N,
CN: ClientNet<A, V>,
A,
V: PartialOrd + Clone,
VS: VersionService<Version = V>,
E: SendEvent<VersionOk<V>>,
> OnEvent<VersionOk<V>> for Server<N, CN, VS, E, V, A>
{
fn on_event(
&mut self,
CreateVersionOk(key, version): CreateVersionOk<V>,
VersionOk(key, version): VersionOk<V>,
_: &mut impl Timer,
) -> anyhow::Result<()> {
//
Expand Down
2 changes: 1 addition & 1 deletion src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use sha2::{Digest, Sha256};
// There's no well known solution for deriving digest methods for general
// structural data i.e. structs and enums (as far as I know), which means to
// compute digest for a structural data e.g. message type, one has to do either:
// specify the tranversal manually
// specify the traversal manually
// derive `Hash` and make use of it
// derive `Serialize` and make use of it
// derive `BorshSerialize`, which is similar to `Serialize` but has been
Expand Down
56 changes: 28 additions & 28 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub mod workload;
//
// there has been a lot of generic around this codebase, naming convention
// is roughly: `M`/`N` for messages/events, `S`/`U` for state machine (that
// ususally `impl OnEvent`) (can also use `T` when it is not confused with
// usually `impl OnEvent`) (can also use `T` when it is not confused with
// representing general types), `N`/`M` for nets, `E` for `impl SendEvent`
// (i.e. emitter), `A`/`B` for addresses
// (yeah, i also just realized the unfortunate reusing of `M`/`N`)
Expand All @@ -33,36 +33,36 @@ pub mod workload;
// no particular reason, just a "std" module causes unnecessary problems
//
// although does not look like so, but the codebase does not have a tendency of
// using generic/trait ploymophism everywhere. enum polymophism is used as long
// as the case is simple enough to be supported by it, e.g. crypto, worker, etc.
// the user of this codebase is generally assumed to keep this codebase at a
// writable place (i.e. not as a git/crates.io dependency), so more enum
// using generic/trait polymorphism everywhere. enum polymorphism is used as
// long as the case is simple enough to be supported by it, e.g. crypto, worker,
// etc. the user of this codebase is generally assumed to keep this codebase at
// a writable place (i.e. not as a git/crates.io dependency), so more enum
// variants can always be added if necessary.
//
// one case that must use trait is when type erasuring is required, that is, the
// one case that must use trait is when type erasure is required, that is, the
// type parameters of implementors must not be exposed to the use-site. this is
// the case for e.g. `SendEvent`.
//
// for `SendMessage` it's more about code organization. if it is instead an enum
// then another enum must be constructed to hold all message types. having that
// gaint enum feels ridiculous. a `SendMessage` trait also brings additional
// giant enum feels ridiculous. a `SendMessage` trait also brings additional
// benefits like allow me to come out with fancy things like `SendAddr` :)
//
// the rational for the address type generic (notice that the type is not
// necessarily `impl Addr`) is that most components only work with one address
// type (plus some auxiliry types like `All`), so an address enum will introduce
// many runtime checks and places that bugs may sneak in. it is anonying to keep
// a generic address type all over the place, but i have to choose between
// either this or anonying downcasts every time the address is evaluated. i
// decide to go down with the type parameter since it make better use of the
// compiler
// type (plus some auxiliary types like `All`), so an address enum will
// introduce many runtime checks and places that bugs may sneak in. it is
// annoying to keep a generic address type all over the place, but i have to
// choose between either this or annoying downcasts every time the address is
// evaluated. i decide to go down with the type parameter since it make better
// use of the compiler
//
// when working with these traits, either to keep type parameters or make them
// into trait objects (if possible) is mostly a matter of taste. there are
// certain cases where a type parameter must be introduced e.g. when it is
// propagated across interfaces, and ther are also other cases where a trait
// propagated across interfaces, and there are also other cases where a trait
// object must be used e.g. when sending events in circle while working with
// type-erasured events (so the `impl OnEvent` type appears on `impl SendEvent`
// type-erased events (so the `impl OnEvent` type appears on `impl SendEvent`
// instead of the event types), and trait objects must be used somewhere to
// break infinite type recursion. tentatively the rule of thumb is to allocate
// type parameters for the types that hold nonvolatile states e.g. the
Expand All @@ -83,7 +83,7 @@ pub mod workload;
// scripts. there's no dependency from `tools/` to `src/`. there's no dependency
// from `src/` to `tools/` except some shared serialization schemas. the
// interfaces between them are built based on HTTP. i believe this isolated
// design results in good separation of conern
// design results in good separation of concern
//
// for `crates` the story is much simpler. `src/` should be compilable against
// plain Ubuntu system. and code live in `crates` if they have further system
Expand All @@ -103,16 +103,16 @@ pub mod workload;
//
// i choose to not build universal connection between them because they are
// expected to be used in different context: `SendEvent` probably sends to local
// receipant i.e. `impl OnEvent`, thus it's possible to work with type-erased
// recipient i.e. `impl OnEvent`, thus it's possible to work with type-erased
// events; `SendMessage` on contrast sends to "the outside world", so the
// sending cannot be directly hooked up with the consuming, so type erasure
// cannot be performed and message types are probably plain old types. also,
// `impl SendEvent` guarantees reliable delivery, while it's ok for
// `impl SendMessage` to silently fail
//
// i feel sorry to have so many `anyhow::Result<()>` all over the codebase. i
// should have aliased it if i foreseed it would be this many. nevertheless, i
// guess the enhancement of std error will not get stablized in any time soon,
// should have aliased it if i foresaw it would be this many. nevertheless, i
// guess the enhancement of std error will not get stabilized in any time soon,
// so there still a long time before i need to consider replace these results
// with `Result<(), Box<dyn Error + Send + Sync>>`. probably the codebase has
// been long gone before that
Expand All @@ -129,17 +129,17 @@ pub mod workload;
// panic assertion in heap allocation code path (at least in glibc), which
// means some cases of failing to fulfilling an unsafe contract are already
// converted into panicking, or in another direction, encountering a panic
// already may imply failing to filfull some contracts
// already may imply failing to fulfill some contracts
//
// use unstructured/ad-hoc error i.e. anyhow::anyhow!() when there's no upper
// layer wants to handle that error, so it suppose to lead to crash. the
// oppotunity of making such conclusion is usually rare, but in this codebase
// opportunity of making such conclusion is usually rare, but in this codebase
// i happen to get a lot of chance, since myself is the only downstream user
// of the library part of code. for example, the structural errors that not
// exist at the presence but would like to have are the ones for disconnected
// channels. those are usually because of unexpected exiting that caused by
// some other more foundamental errors, so filter them off helps focus on
// other more meaningful errors
// some other more fundamental errors, so filter them off helps focus on other
// more meaningful errors
//
// and lastly, create on-demand structured errors when it may be captured and
// downcast later. in some sense unstructured error offers more information on
Expand All @@ -152,7 +152,7 @@ pub mod workload;
// still possible to happen, so the artifacts must be prepared for both to
// happen and try their best to do reliable crashing. this is the main reason
// that i used to panic only in the past. the other reasons include panic has
// builtin backtrace support (which denimished after anyhow supports backtrace
// builtin backtrace support (which diminished after anyhow supports backtrace
// in stable), and panic is reported when it's happening (no matter whether or
// not get caught later), so it's very simple to reliably both crash and report
// by using panic and `panic = "abort"` in cargo config. The latter has
Expand All @@ -168,12 +168,12 @@ pub mod workload;
// use panic is to save some code that i assert will never be executed, so it's
// ok to have false positive error-returning code: that just some unreachable
// code that is too hard to be proved to be dead code. however, if my assertion
// on unreachability is incorrect and the program does panic, then i should
// replace the panic with the code that previously saved e.g. return an error
// instead.
// on the "unreachability" is incorrect and the program does panic, then i
// should replace the panic with the code that previously saved e.g. return an
// error instead
//
// on the other hand, unstructured error defines "under what condition this code
// suppose to work". if the conidition is just temporarily unsatisfied, i can
// suppose to work". if the condition is just temporarily unsatisfied, i can
// safely ignore this transient error; otherwise, i should think about how to
// improve the implementation to make it workable in more situations.
//
Expand Down
16 changes: 8 additions & 8 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::event::SendEvent;
// use crate::event::SendEvent;

// TODO find a use case for non-erased (type-preseved?) variant
// or just remove it at all. anyway no performance gain here
pub type Work<S, M> = erased::Work<S, dyn SendEvent<M>>;
pub type Worker<S, M> = erased::Worker<S, dyn SendEvent<M>>;
// pub type Work<S, M> = erased::Work<S, dyn SendEvent<M>>;
// pub type Worker<S, M> = erased::Worker<S, dyn SendEvent<M>>;

pub type SpawnExecutor<S, M> = erased::SpawnExecutor<S, dyn SendEvent<M>>;
pub type SpawnWorker<S, M> = erased::SpawnWorker<S, dyn SendEvent<M>>;
// pub type SpawnExecutor<S, M> = erased::SpawnExecutor<S, dyn SendEvent<M>>;
// pub type SpawnWorker<S, M> = erased::SpawnWorker<S, dyn SendEvent<M>>;

pub fn spawn_backend<S, M>(state: S) -> (Worker<S, M>, SpawnExecutor<S, M>) {
erased::spawn_backend(state)
}
// pub fn spawn_backend<S, M>(state: S) -> (Worker<S, M>, SpawnExecutor<S, M>) {
// erased::spawn_backend(state)
// }

pub mod erased {
use tokio::{
Expand Down

0 comments on commit dca0154

Please sign in to comment.