Skip to content

Commit

Permalink
Update COPS draft implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Apr 11, 2024
1 parent 9fc349b commit 83282a4
Showing 1 changed file with 139 additions and 77 deletions.
216 changes: 139 additions & 77 deletions src/cops.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::{
cmp::Ordering::{Equal, Greater},
collections::BTreeMap,
};
use std::{cmp::Ordering, collections::VecDeque};

use serde::{Deserialize, Serialize};

use crate::{
event::{erased::OnEvent, Timer},
event::{erased::OnEvent, SendEvent, Timer},
net::{events::Recv, Addr, All, SendMessage},
util::Payload,
};
Expand All @@ -18,138 +15,203 @@ pub type KeyId = u32;
pub struct Put<V, A> {
key: KeyId,
value: Payload,
deps: BTreeMap<KeyId, V>,
deps: V,
client_addr: A,
}

#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
pub struct PutOk<V> {
version: V,
version_deps: V,
}

#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
pub struct Get<A> {
pub struct Get<V, A> {
key: KeyId,
deps: V,
client_addr: A,
}

#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
pub struct GetOk<V, A> {
put: Put<V, A>,
version: V,
pub struct GetOk<V> {
value: Payload,
version_deps: V,
}

#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
pub struct SyncKey<V, A> {
put: Put<V, A>,
version: V,
pub struct SyncKey<V> {
key: KeyId,
value: Payload,
version_deps: V,
}

pub trait ClientNet<A, V>: SendMessage<A, GetOk<V, A>> + SendMessage<A, PutOk<V>> {}
impl<T: SendMessage<A, GetOk<V, A>> + SendMessage<A, PutOk<V>>, A, V> ClientNet<A, V> for T {}
pub trait ClientNet<A, V>: SendMessage<A, GetOk<V>> + SendMessage<A, PutOk<V>> {}
impl<T: SendMessage<A, GetOk<V>> + SendMessage<A, PutOk<V>>, A, V> ClientNet<A, V> for T {}

pub trait ServerNet<A, V>:
SendMessage<u8, Put<V, A>> + SendMessage<u8, Get<A>> + SendMessage<All, SyncKey<V, A>>
SendMessage<u8, Put<V, A>> + SendMessage<u8, Get<V, A>> + SendMessage<All, SyncKey<V>>
{
}
impl<
T: SendMessage<u8, Put<V, A>> + SendMessage<u8, Get<A>> + SendMessage<All, SyncKey<V, A>>,
T: SendMessage<u8, Put<V, A>> + SendMessage<u8, Get<V, A>> + SendMessage<All, SyncKey<V>>,
A,
V,
> ServerNet<A, V> for T
{
}

pub trait VersionService {
type Version;
fn merge_and_increment_once(
&self,
id: KeyId,
previous: Option<Self::Version>,
deps: Vec<Self::Version>,
) -> anyhow::Result<()>;
pub trait DepOrd {
fn dep_cmp(&self, other: &Self, id: KeyId) -> Ordering;
}

pub trait Version: PartialOrd + DepOrd + Clone + Send + Sync + 'static {}
impl<T: PartialOrd + DepOrd + Clone + Send + Sync + 'static> Version for T {}

pub struct IncompleteMerge<V>(pub V, pub V);

pub struct CompleteMerge<V> {
pub id: KeyId,
pub previous: V, // must be complete, consider encode this into type
pub deps: V, // either complete or incomplete
}

pub trait Version: PartialOrd + Clone + Send + Sync + 'static {}
impl<T: PartialOrd + Clone + Send + Sync + 'static> Version for T {}
pub struct IncompleteMerged<V>(pub V);

pub struct VersionOk<V, A>(pub Put<V, A>, pub V);
pub struct CompleteMerged<V> {
pub id: KeyId,
pub version_deps: V,
}

// pub struct Client<V> {
// }

pub struct Server<N, CN, VS, V, A> {
store: BTreeMap<KeyId, (Put<V, A>, V)>,
pub struct Server<N, CN, VS, V, A, _M = (N, CN, VS, V, A)> {
store: Vec<KeyState<V, A>>,
net: N,
client_net: CN,
#[allow(unused)]
version_worker: VS,
version_service: VS,
_m: std::marker::PhantomData<_M>,
}

impl<N, CN: ClientNet<A, V>, A: Addr, V: Version, VS> OnEvent<Recv<Get<A>>>
for Server<N, CN, VS, V, A>
{
fn on_event(&mut self, Recv(get): Recv<Get<A>>, _: &mut impl Timer) -> anyhow::Result<()> {
if let Some((put, version)) = self.store.get(&get.key) {
let get_ok = GetOk {
put: put.clone(),
version: version.clone(),
};
return self.client_net.send(get.client_addr, get_ok);
}
Ok(())
}
struct KeyState<V, A> {
value: Payload,
version_deps: V,
pending_puts: VecDeque<Put<V, A>>,
}

impl<N: ServerNet<A, V>, CN: ClientNet<A, V>, A, V: Version, VS: VersionService<Version = V>>
OnEvent<Recv<Put<V, A>>> for Server<N, CN, VS, V, A>
pub trait ServerCommon {
type N: ServerNet<Self::A, Self::V>;
type CN: ClientNet<Self::A, Self::V>;
type VS: SendEvent<CompleteMerge<Self::V>>;
type V: Version;
type A: Addr;
}
impl<N, CN, VS, V, A> ServerCommon for (N, CN, VS, V, A)
where
N: ServerNet<A, V>,
CN: ClientNet<A, V>,
VS: SendEvent<CompleteMerge<V>>,
V: Version,
A: Addr,
{
fn on_event(&mut self, Recv(put): Recv<Put<V, A>>, _: &mut impl Timer) -> anyhow::Result<()> {
if let Some((_, version)) = self.store.get(&put.key) {
if put
.deps
.iter()
.all(|(_, v)| matches!(version.partial_cmp(v), Some(Greater | Equal)))
{
let put_ok = PutOk {
version: version.clone(),
};
return self.client_net.send(put.client_addr, put_ok);
type N = N;
type CN = CN;
type VS = VS;
type V = V;
type A = A;
}

impl<M: ServerCommon> OnEvent<Recv<Get<M::V, M::A>>> for Server<M::N, M::CN, M::VS, M::V, M::A, M> {
fn on_event(
&mut self,
Recv(get): Recv<Get<M::V, M::A>>,
_: &mut impl Timer,
) -> anyhow::Result<()> {
for (id, state) in self.store.iter().enumerate() {
if get.deps.dep_cmp(&state.version_deps, id as _).is_gt() {
//
return Ok(());
}
}
Ok(())
let state = &self.store[get.key as usize];
let get_ok = GetOk {
value: state.value.clone(),
version_deps: state.version_deps.clone(),
};
self.client_net.send(get.client_addr, get_ok)
}
}

impl<N, CN: ClientNet<A, V>, A: Addr, V: Version, VS: VersionService<Version = V>>
OnEvent<Recv<SyncKey<V, A>>> for Server<N, CN, VS, V, A>
{
impl<M: ServerCommon> OnEvent<Recv<Put<M::V, M::A>>> for Server<M::N, M::CN, M::VS, M::V, M::A, M> {
fn on_event(
&mut self,
Recv(sync): Recv<SyncKey<V, A>>,
Recv(put): Recv<Put<M::V, M::A>>,
_: &mut impl Timer,
) -> anyhow::Result<()> {
// TODO
self.store.insert(sync.put.key, (sync.put, sync.version));
for (id, state) in self.store.iter().enumerate() {
if put.deps.dep_cmp(&state.version_deps, id as _).is_gt() {
//
return Ok(());
}
}
let state = &mut self.store[put.key as usize];
state.pending_puts.push_back(put.clone());
if state.pending_puts.len() == 1 {
let merge = CompleteMerge {
id: put.key,
previous: state.version_deps.clone(),
deps: put.deps,
};
self.version_service.send(merge)?
}
Ok(())
}
}

impl<
N: ServerNet<A, V>,
CN: ClientNet<A, V>,
A,
V: PartialOrd + Clone,
VS: VersionService<Version = V>,
> OnEvent<VersionOk<V, A>> for Server<N, CN, VS, V, A>
{
impl<M: ServerCommon> OnEvent<Recv<SyncKey<M::V>>> for Server<M::N, M::CN, M::VS, M::V, M::A, M> {
fn on_event(
&mut self,
VersionOk(put, version): VersionOk<V, A>,
Recv(_): Recv<SyncKey<M::V>>,
_: &mut impl Timer,
) -> anyhow::Result<()> {
let sync_key = SyncKey { put, version };
self.net.send(All, sync_key)
// TODO
Ok(())
}
}

impl<M: ServerCommon> OnEvent<CompleteMerged<M::V>> for Server<M::N, M::CN, M::VS, M::V, M::A, M> {
fn on_event(&mut self, merged: CompleteMerged<M::V>, _: &mut impl Timer) -> anyhow::Result<()> {
let state = &mut self.store[merged.id as usize];
let Some(put) = state.pending_puts.pop_front() else {
anyhow::bail!("missing pending puts")
};
anyhow::ensure!(matches!(
merged.version_deps.partial_cmp(&put.deps),
Some(Ordering::Greater)
));
anyhow::ensure!(matches!(
merged.version_deps.partial_cmp(&state.version_deps),
Some(Ordering::Greater)
));
state.value = put.value.clone();
state.version_deps = merged.version_deps.clone();
let put_ok = PutOk {
version_deps: merged.version_deps.clone(),
};
self.client_net.send(put.client_addr, put_ok)?;
let sync_key = SyncKey {
key: put.key,
value: put.value,
version_deps: merged.version_deps.clone(),
};
self.net.send(All, sync_key)?;
if let Some(pending_put) = state.pending_puts.front() {
let merge = CompleteMerge {
id: merged.id,
previous: merged.version_deps,
deps: pending_put.deps.clone(),
};
self.version_service.send(merge)?
}
Ok(())
}
}

0 comments on commit 83282a4

Please sign in to comment.