From 6704acb87d4f3fc7bea5b69086cf762b9daa1227 Mon Sep 17 00:00:00 2001 From: sgdxbc Date: Sat, 30 Mar 2024 05:18:05 +0000 Subject: [PATCH] Checkpoint before switching to Worker based solution --- src/cops.rs | 139 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 105 insertions(+), 34 deletions(-) diff --git a/src/cops.rs b/src/cops.rs index b08e5acd..e0cc3ea9 100644 --- a/src/cops.rs +++ b/src/cops.rs @@ -4,15 +4,18 @@ use std::{ }; use crate::{ - event::{erased::OnEvent, Timer}, + event::{erased::OnEvent, SendEvent, Timer}, message::Payload, net::{events::Recv, SendMessage}, }; +// "key" under COPS context, "id" under Boson's logical clock context +pub type KeyId = u32; + pub struct Put { - key: Payload, + key: KeyId, value: Payload, - deps: BTreeMap, + deps: BTreeMap, client_addr: A, } @@ -21,7 +24,7 @@ pub struct PutOk { } pub struct Get { - key: Payload, + key: KeyId, client_addr: A, } @@ -31,13 +34,13 @@ pub struct GetOk { } pub struct Sync { - key: Payload, + key: KeyId, version: V, server_addr: A, } pub struct SyncOk { - key: Payload, + key: KeyId, value: Payload, version: V, } @@ -64,28 +67,30 @@ impl< } pub struct CreateVersion { - pub previous: V, + pub id: KeyId, + pub previous: Option, pub deps: Vec, } -pub struct CreateVersionOk(pub V); +pub struct CreateVersionOk(pub KeyId, pub V); // pub struct Client { // } -pub struct Server { - store: BTreeMap, +pub trait VersionService: SendEvent> {} +impl>, V> VersionService for T {} + +pub struct Server { + store: BTreeMap, + version_zero: V, + pending_puts: Vec>, + pending_gets: Vec>, net: N, client_net: CN, - pending_puts: Vec>, -} - -struct PendingPut { - message: Put, - keys: HashSet, + version_service: S, } -impl, A, V: Clone> OnEvent>> for Server { +impl, A, V: Clone, S> OnEvent>> for Server { fn on_event(&mut self, Recv(get): Recv>, _: &mut impl Timer) -> anyhow::Result<()> { if let Some((value, version)) = self.store.get(&get.key) { let get_ok = GetOk { @@ -94,13 +99,14 @@ impl, A, V: Clone> OnEvent>> for Server, CN: ClientNet, A, V: PartialOrd + Clone> OnEvent>> - for Server +impl, CN: ClientNet, A, V: PartialOrd + Clone, S: VersionService> + OnEvent>> for Server { fn on_event(&mut self, Recv(put): Recv>, _: &mut impl Timer) -> anyhow::Result<()> { if let Some((_, version)) = self.store.get(&put.key) { @@ -124,23 +130,43 @@ impl, CN: ClientNet, A, V: PartialOrd + Clone> OnEvent< return None; } } - Some(dep_key.clone()) + Some(dep_key) }) .collect::>(); if pending_sync_keys.is_empty() { - // TODO + let deps = put + .deps + .keys() + .map(|dep_key| self.store.get(dep_key).unwrap().1.clone()) + .collect::>(); + let previous = self.store.get(&put.key).map(|(_, version)| version.clone()); + // shortcut only when both condition holds + // if there are already dependencies on the first version, the version should not start + // with `version_zero` anymore + // if there is no dependency but it's not the first version, the version still need to + // be incremented + if deps.is_empty() && previous.is_none() { + self.store + .insert(put.key, (put.value, self.version_zero.clone())); + let put_ok = PutOk { + version: self.version_zero.clone(), + }; + return self.client_net.send(put.client_addr, put_ok); + } + self.version_service.send(CreateVersion { + id: put.key, + previous, + deps, + })?; } - self.pending_puts.push(PendingPut { - message: put, - keys: pending_sync_keys, - }); - // TODO + // TODO figure out who to sync with and send Sync + self.pending_puts.push(put); Ok(()) } } -impl, CN, A, V: PartialOrd + Clone> OnEvent>> - for Server +impl, CN, A, V: PartialOrd + Clone, S> OnEvent>> + for Server { fn on_event(&mut self, Recv(sync): Recv>, _: &mut impl Timer) -> anyhow::Result<()> { if let Some((value, version)) = self.store.get(&sync.key) { @@ -157,8 +183,8 @@ impl, CN, A, V: PartialOrd + Clone> OnEvent>> } } -impl, A, V: PartialOrd + Clone> OnEvent>> - for Server +impl, A, V: PartialOrd + Clone, S: VersionService> + OnEvent>> for Server { fn on_event( &mut self, @@ -170,9 +196,54 @@ impl, A, V: PartialOrd + Clone> OnEvent>> return Ok(()); } } - self.store - .insert(sync_ok.key, (sync_ok.value, sync_ok.version)); - // TODO + self.store.insert( + sync_ok.key, + (sync_ok.value.clone(), sync_ok.version.clone()), + ); + + let mut pending_gets = Vec::new(); + for get in self.pending_gets.drain(..) { + if get.key != sync_ok.key { + pending_gets.push(get) + } else { + let get_ok = GetOk { + value: sync_ok.value.clone(), + version: sync_ok.version.clone(), + }; + self.client_net.send(get.client_addr, get_ok)? + } + } + self.pending_gets = pending_gets; + + for put in &self.pending_puts { + if put.deps.iter().all(|(dep_key, dep_version)| { + if let Some((_, version)) = self.store.get(dep_key) { + matches!(version.partial_cmp(dep_version), Some(Greater | Equal)) + } else { + false + } + }) { + self.version_service.send(CreateVersion { + id: put.key, + previous: self.store.get(&put.key).map(|(_, version)| version.clone()), + deps: put.deps.values().cloned().collect(), + })? + } + } + + Ok(()) + } +} + +impl, A, V: PartialOrd + Clone, S: VersionService> + OnEvent> for Server +{ + fn on_event( + &mut self, + CreateVersionOk(key, version): CreateVersionOk, + _: &mut impl Timer, + ) -> anyhow::Result<()> { + // Ok(()) } }