Skip to content

Commit

Permalink
Checkpoint before switching to Worker based solution
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Mar 30, 2024
1 parent 261db41 commit 6704acb
Showing 1 changed file with 105 additions and 34 deletions.
139 changes: 105 additions & 34 deletions src/cops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ use std::{
};

use crate::{
event::{erased::OnEvent, Timer},
event::{erased::OnEvent, SendEvent, Timer},
message::Payload,
net::{events::Recv, SendMessage},
};

// "key" under COPS context, "id" under Boson's logical clock context
pub type KeyId = u32;

pub struct Put<V, A> {
key: Payload,
key: KeyId,
value: Payload,
deps: BTreeMap<Payload, V>,
deps: BTreeMap<KeyId, V>,
client_addr: A,
}

Expand All @@ -21,7 +24,7 @@ pub struct PutOk<V> {
}

pub struct Get<A> {
key: Payload,
key: KeyId,
client_addr: A,
}

Expand All @@ -31,13 +34,13 @@ pub struct GetOk<V> {
}

pub struct Sync<V, A> {
key: Payload,
key: KeyId,
version: V,
server_addr: A,
}

pub struct SyncOk<V> {
key: Payload,
key: KeyId,
value: Payload,
version: V,
}
Expand All @@ -64,28 +67,30 @@ impl<
}

pub struct CreateVersion<V> {
pub previous: V,
pub id: KeyId,
pub previous: Option<V>,
pub deps: Vec<V>,
}

pub struct CreateVersionOk<V>(pub V);
pub struct CreateVersionOk<V>(pub KeyId, pub V);

// pub struct Client<V> {
// }

pub struct Server<N, CN, V, A> {
store: BTreeMap<Payload, (Payload, V)>,
pub trait VersionService<V>: SendEvent<CreateVersion<V>> {}
impl<T: SendEvent<CreateVersion<V>>, V> VersionService<V> for T {}

pub struct Server<N, CN, S, V, A> {
store: BTreeMap<KeyId, (Payload, V)>,
version_zero: V,
pending_puts: Vec<Put<V, A>>,
pending_gets: Vec<Get<A>>,
net: N,
client_net: CN,
pending_puts: Vec<PendingPut<V, A>>,
}

struct PendingPut<V, A> {
message: Put<V, A>,
keys: HashSet<Payload>,
version_service: S,
}

impl<N, CN: ClientNet<A, V>, A, V: Clone> OnEvent<Recv<Get<A>>> for Server<N, CN, V, A> {
impl<N, CN: ClientNet<A, V>, A, V: Clone, S> OnEvent<Recv<Get<A>>> for Server<N, CN, S, V, A> {
fn on_event(&mut self, Recv(get): Recv<Get<A>>, _: &mut impl Timer) -> anyhow::Result<()> {
if let Some((value, version)) = self.store.get(&get.key) {
let get_ok = GetOk {
Expand All @@ -94,13 +99,14 @@ impl<N, CN: ClientNet<A, V>, A, V: Clone> OnEvent<Recv<Get<A>>> for Server<N, CN
};
return self.client_net.send(get.client_addr, get_ok);
}
// TODO
// TODO figure out who to sync with and send Sync
self.pending_gets.push(get);
Ok(())
}
}

impl<N: ServerNet<A, V>, CN: ClientNet<A, V>, A, V: PartialOrd + Clone> OnEvent<Recv<Put<V, A>>>
for Server<N, CN, V, A>
impl<N: ServerNet<A, V>, CN: ClientNet<A, V>, A, V: PartialOrd + Clone, S: VersionService<V>>
OnEvent<Recv<Put<V, A>>> for Server<N, CN, S, V, A>
{
fn on_event(&mut self, Recv(put): Recv<Put<V, A>>, _: &mut impl Timer) -> anyhow::Result<()> {
if let Some((_, version)) = self.store.get(&put.key) {
Expand All @@ -124,23 +130,43 @@ impl<N: ServerNet<A, V>, CN: ClientNet<A, V>, A, V: PartialOrd + Clone> OnEvent<
return None;
}
}
Some(dep_key.clone())
Some(dep_key)
})
.collect::<HashSet<_>>();
if pending_sync_keys.is_empty() {
// TODO
let deps = put
.deps
.keys()
.map(|dep_key| self.store.get(dep_key).unwrap().1.clone())
.collect::<Vec<_>>();
let previous = self.store.get(&put.key).map(|(_, version)| version.clone());
// shortcut only when both condition holds
// if there are already dependencies on the first version, the version should not start
// with `version_zero` anymore
// if there is no dependency but it's not the first version, the version still need to
// be incremented
if deps.is_empty() && previous.is_none() {
self.store
.insert(put.key, (put.value, self.version_zero.clone()));
let put_ok = PutOk {
version: self.version_zero.clone(),
};
return self.client_net.send(put.client_addr, put_ok);
}
self.version_service.send(CreateVersion {
id: put.key,
previous,
deps,
})?;
}
self.pending_puts.push(PendingPut {
message: put,
keys: pending_sync_keys,
});
// TODO
// TODO figure out who to sync with and send Sync
self.pending_puts.push(put);
Ok(())
}
}

impl<N: ServerNet<A, V>, CN, A, V: PartialOrd + Clone> OnEvent<Recv<Sync<V, A>>>
for Server<N, CN, V, A>
impl<N: ServerNet<A, V>, CN, A, V: PartialOrd + Clone, S> OnEvent<Recv<Sync<V, A>>>
for Server<N, CN, S, V, A>
{
fn on_event(&mut self, Recv(sync): Recv<Sync<V, A>>, _: &mut impl Timer) -> anyhow::Result<()> {
if let Some((value, version)) = self.store.get(&sync.key) {
Expand All @@ -157,8 +183,8 @@ impl<N: ServerNet<A, V>, CN, A, V: PartialOrd + Clone> OnEvent<Recv<Sync<V, A>>>
}
}

impl<N, CN: ClientNet<A, V>, A, V: PartialOrd + Clone> OnEvent<Recv<SyncOk<V>>>
for Server<N, CN, V, A>
impl<N, CN: ClientNet<A, V>, A, V: PartialOrd + Clone, S: VersionService<V>>
OnEvent<Recv<SyncOk<V>>> for Server<N, CN, S, V, A>
{
fn on_event(
&mut self,
Expand All @@ -170,9 +196,54 @@ impl<N, CN: ClientNet<A, V>, A, V: PartialOrd + Clone> OnEvent<Recv<SyncOk<V>>>
return Ok(());
}
}
self.store
.insert(sync_ok.key, (sync_ok.value, sync_ok.version));
// TODO
self.store.insert(
sync_ok.key,
(sync_ok.value.clone(), sync_ok.version.clone()),
);

let mut pending_gets = Vec::new();
for get in self.pending_gets.drain(..) {
if get.key != sync_ok.key {
pending_gets.push(get)
} else {
let get_ok = GetOk {
value: sync_ok.value.clone(),
version: sync_ok.version.clone(),
};
self.client_net.send(get.client_addr, get_ok)?
}
}
self.pending_gets = pending_gets;

for put in &self.pending_puts {
if put.deps.iter().all(|(dep_key, dep_version)| {
if let Some((_, version)) = self.store.get(dep_key) {
matches!(version.partial_cmp(dep_version), Some(Greater | Equal))
} else {
false
}
}) {
self.version_service.send(CreateVersion {
id: put.key,
previous: self.store.get(&put.key).map(|(_, version)| version.clone()),
deps: put.deps.values().cloned().collect(),
})?
}
}

Ok(())
}
}

impl<N, CN: ClientNet<A, V>, A, V: PartialOrd + Clone, S: VersionService<V>>
OnEvent<CreateVersionOk<V>> for Server<N, CN, S, V, A>
{
fn on_event(
&mut self,
CreateVersionOk(key, version): CreateVersionOk<V>,
_: &mut impl Timer,
) -> anyhow::Result<()> {
//
Ok(())
}
}

0 comments on commit 6704acb

Please sign in to comment.