From 9f9875ccc96e4b1e3a0c012f5b3f4013ca3a68a5 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Thu, 26 May 2016 08:29:39 +0200 Subject: [PATCH 01/13] deque capacity --- ipc/rpc/src/binary.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipc/rpc/src/binary.rs b/ipc/rpc/src/binary.rs index 17d7283af94..125f246fdab 100644 --- a/ipc/rpc/src/binary.rs +++ b/ipc/rpc/src/binary.rs @@ -319,8 +319,8 @@ pub fn deserialize_from(r: &mut R) -> Result let mut payload = Vec::new(); try!(r.read_to_end(&mut payload).map_err(|_| BinaryConvertError)); - let mut length_stack = VecDeque::::new(); let stack_len = try!(u64::from_bytes(&payload[0..8], &mut fake_stack)) as usize; + let mut length_stack = VecDeque::::with_capacity(stack_len); if stack_len > 0 { for idx in 0..stack_len { From 53859e2bf50052850f06a9b3a720e317f6278bac Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Thu, 26 May 2016 12:18:51 +0200 Subject: [PATCH 02/13] flush work --- db/src/database.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++-- db/src/traits.rs | 24 +++++++++++++++++- 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/db/src/database.rs b/db/src/database.rs index 23655524bc8..079493761fd 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -25,7 +25,7 @@ use std::convert::From; use ipc::IpcConfig; use std::mem; use ipc::binary::BinaryConvertError; -use std::collections::VecDeque; +use std::collections::{VecDeque, HashMap}; impl From for Error { fn from(s: String) -> Error { @@ -33,10 +33,66 @@ impl From for Error { } } +pub struct WriteQue { + cache: HashMap, Vec>, + write_log: VecDeque>, + cache_len: usize, +} + +impl WriteQue { + fn new(cache_len: usize) -> WriteQue { + WriteQue { + cache: HashMap::new(), + write_log: VecDeque::new(), + cache_len: cache_len, + } + } + + fn write(&mut self, key: Vec, val: Vec) { + self.cache.insert(key.clone(), val); + self.write_log.push_back(key); + } + + fn remove(&mut self, key: Vec) { + self.cache.remove(&key); + self.write_log.push_back(key); + } + + fn flush(&mut self, db: &mut DB, keys: usize) { + let mut so_far = 0; + loop { + if so_far == keys { break; } + let next = self.write_log.pop_front(); + if next.is_none() { break; } + let next = next.unwrap(); + if self.cache.len() > self.cache_len { + let key_cache_removed = self.cache.remove(&next); + if key_cache_removed.is_some() { + db.put(&next, &key_cache_removed.unwrap()); + } + else { + db.delete(&next); + } + } + else { + let key_persisted = self.cache.get(&next); + if key_persisted.is_some() { + db.put(&next, &key_persisted.unwrap()); + } + else { + db.delete(&next); + } + } + so_far = so_far + 1; + } + } +} + pub struct Database { db: RwLock>, transactions: RwLock>, iterators: RwLock>, + write_que: RwLock, } impl Database { @@ -45,6 +101,7 @@ impl Database { db: RwLock::new(None), transactions: RwLock::new(BTreeMap::new()), iterators: RwLock::new(BTreeMap::new()), + write_que: RwLock::new(WriteQue::new(DEFAULT_CACHE_LEN)), } } } @@ -73,7 +130,7 @@ impl DatabaseService for Database { /// Opens database in the specified path with the default config fn open_default(&self, path: String) -> Result<(), Error> { - self.open(DatabaseConfig { prefix_size: None }, path) + self.open(DatabaseConfig::default(), path) } fn close(&self) -> Result<(), Error> { diff --git a/db/src/traits.rs b/db/src/traits.rs index 1a45fc1b9ab..63ceceae94b 100644 --- a/db/src/traits.rs +++ b/db/src/traits.rs @@ -9,6 +9,8 @@ use std::cell::RefCell; pub type TransactionHandle = u32; pub type IteratorHandle = u32; +pub const DEFAULT_CACHE_LEN: usize = 20480; + #[derive(Binary)] pub struct KeyValue { pub key: Vec, @@ -29,7 +31,27 @@ pub enum Error { #[derive(Binary)] pub struct DatabaseConfig { /// Optional prefix size in bytes. Allows lookup by partial key. - pub prefix_size: Option + pub prefix_size: Option, + /// write cache length + pub cache: usize, +} + +impl Default for DatabaseConfig { + fn default() -> DatabaseConfig { + DatabaseConfig { + prefix_size: None, + cache: DEFAULT_CACHE_LEN, + } + } +} + +impl DatabaseConfig { + fn with_prefix(prefix: usize) -> DatabaseConfig { + DatabaseConfig { + prefix_size: Some(prefix), + cache: DEFAULT_CACHE_LEN, + } + } } pub trait DatabaseService { From ac4351fda9cee269e8f6b609f6280ca7582b8404 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Thu, 26 May 2016 13:53:44 +0200 Subject: [PATCH 03/13] write que basic --- db/src/database.rs | 47 ++++++++++++++++++++++++++++++++-------------- db/src/traits.rs | 1 - 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/db/src/database.rs b/db/src/database.rs index 079493761fd..1e33801c7f7 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -39,6 +39,8 @@ pub struct WriteQue { cache_len: usize, } +const FLUSH_BATCH_SIZE: usize = 1048; + impl WriteQue { fn new(cache_len: usize) -> WriteQue { WriteQue { @@ -58,7 +60,7 @@ impl WriteQue { self.write_log.push_back(key); } - fn flush(&mut self, db: &mut DB, keys: usize) { + fn flush(&mut self, db: &DB, keys: usize) -> Result<(), Error> { let mut so_far = 0; loop { if so_far == keys { break; } @@ -68,23 +70,24 @@ impl WriteQue { if self.cache.len() > self.cache_len { let key_cache_removed = self.cache.remove(&next); if key_cache_removed.is_some() { - db.put(&next, &key_cache_removed.unwrap()); + try!(db.put(&next, &key_cache_removed.unwrap())); } else { - db.delete(&next); + try!(db.delete(&next)); } } else { let key_persisted = self.cache.get(&next); if key_persisted.is_some() { - db.put(&next, &key_persisted.unwrap()); + try!(db.put(&next, &key_persisted.unwrap())); } else { - db.delete(&next); + try!(db.delete(&next)); } } so_far = so_far + 1; } + Ok(()) } } @@ -104,6 +107,22 @@ impl Database { write_que: RwLock::new(WriteQue::new(DEFAULT_CACHE_LEN)), } } + + pub fn flush(&self) -> Result<(), Error> { + let mut que = self.write_que.write().unwrap(); + let db_lock = self.db.read().unwrap(); + if db_lock.is_none() { return Ok(()); } + let db = db_lock.as_ref().unwrap(); + + try!(que.flush(&db, FLUSH_BATCH_SIZE)); + Ok(()) + } +} + +impl Drop for Database { + fn drop(&mut self) { + self.flush().unwrap(); + } } #[derive(Ipc)] @@ -353,7 +372,7 @@ mod test { fn can_be_open_empty() { let db = Database::new(); let path = RandomTempPath::create_dir(); - db.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + db.open_default(path.as_str().to_owned()).unwrap(); assert!(db.is_empty().is_ok()); } @@ -362,7 +381,7 @@ mod test { fn can_store_key() { let db = Database::new(); let path = RandomTempPath::create_dir(); - db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); + db.open_default(path.as_str().to_owned()).unwrap(); db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); assert!(!db.is_empty().unwrap()); @@ -372,11 +391,11 @@ mod test { fn can_retrieve() { let db = Database::new(); let path = RandomTempPath::create_dir(); - db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); + db.open_default(path.as_str().to_owned()).unwrap(); db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); db.close().unwrap(); - db.open(DatabaseConfig { prefix_size: None }, path.as_str().to_owned()).unwrap(); + db.open_default(path.as_str().to_owned()).unwrap(); assert_eq!(db.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); } } @@ -444,7 +463,7 @@ mod client_tests { while !worker_is_ready.load(Ordering::Relaxed) { } let client = nanoipc::init_duplex_client::>(url).unwrap(); - client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + client.open_default(path.as_str().to_owned()).unwrap(); assert!(client.is_empty().unwrap()); worker_should_exit.store(true, Ordering::Relaxed); } @@ -480,7 +499,7 @@ mod client_tests { client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); client.close().unwrap(); - client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + client.open_default(path.as_str().to_owned()).unwrap(); assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); stop.store(true, Ordering::Relaxed); @@ -497,7 +516,7 @@ mod client_tests { run_worker(scope, stop.clone(), url); let client = nanoipc::init_client::>(url).unwrap(); - client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + client.open_default(path.as_str().to_owned()).unwrap(); assert!(client.get("xxx".as_bytes()).unwrap().is_none()); stop.store(true, Ordering::Relaxed); @@ -521,7 +540,7 @@ mod client_tests { client.close().unwrap(); - client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + client.open_default(path.as_str().to_owned()).unwrap(); assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); stop.store(true, Ordering::Relaxed); @@ -545,7 +564,7 @@ mod client_tests { client.close().unwrap(); - client.open(DatabaseConfig { prefix_size: Some(8) }, path.as_str().to_owned()).unwrap(); + client.open_default(path.as_str().to_owned()).unwrap(); assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); stop.store(true, Ordering::Relaxed); diff --git a/db/src/traits.rs b/db/src/traits.rs index 63ceceae94b..4ed85d6747a 100644 --- a/db/src/traits.rs +++ b/db/src/traits.rs @@ -1,6 +1,5 @@ //! Ethcore database trait -use ipc::BinaryConvertable; use std::mem; use ipc::binary::BinaryConvertError; use std::collections::VecDeque; From 07addff506aaf35bc727963fea57c44d35fa6e04 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Thu, 26 May 2016 18:15:13 +0200 Subject: [PATCH 04/13] extra benches --- db/benches/db.rs | 29 +++++++ db/src/database.rs | 185 ++++++++++++++++++--------------------------- db/src/lib.rs.in | 4 +- db/src/traits.rs | 27 ++----- 4 files changed, 110 insertions(+), 135 deletions(-) diff --git a/db/benches/db.rs b/db/benches/db.rs index 2a8f0323f4d..f0843012fe0 100644 --- a/db/benches/db.rs +++ b/db/benches/db.rs @@ -55,3 +55,32 @@ fn key_write_direct(bencher: &mut Bencher) { client.put(devtools::random_str(2048).as_bytes(), devtools::random_str(2048).as_bytes()).unwrap(); }); } + +#[bench] +fn key_write_read_ipc(bencher: &mut Bencher) { + crossbeam::scope(|scope| { + let stop = devtools::StopGuard::new(); + let temp = devtools::RandomTempPath::create_dir(); + let ipc_url = ethcore_db::extras_service_url(temp.as_str()).unwrap(); + ethcore_db::run_worker(&scope, stop.share(), &ipc_url); + let client = nanoipc::init_client::>(&ipc_url).unwrap(); + client.open_default(temp.as_str().to_owned()).unwrap(); + bencher.iter(|| { + let mut batch = Vec::new(); + for _ in 0..100 { + batch.push((devtools::random_str(256).as_bytes().to_vec(), devtools::random_str(256).as_bytes().to_vec())); + batch.push((devtools::random_str(256).as_bytes().to_vec(), devtools::random_str(2048).as_bytes().to_vec())); + batch.push((devtools::random_str(2048).as_bytes().to_vec(), devtools::random_str(2048).as_bytes().to_vec())); + batch.push((devtools::random_str(2048).as_bytes().to_vec(), devtools::random_str(256).as_bytes().to_vec())); + } + + for &(ref k, ref v) in batch.iter() { + client.put(k, v).unwrap(); + } + + for &(ref k, ref v) in batch.iter() { + assert_eq!(v, &client.get(k).unwrap().unwrap()); + } + }); + }); +} diff --git a/db/src/database.rs b/db/src/database.rs index 1e33801c7f7..130992d04f5 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -60,8 +60,13 @@ impl WriteQue { self.write_log.push_back(key); } + fn get(&self, key: &Vec) -> Option> { + self.cache.get(key).and_then(|vec_ref| Some(vec_ref.clone())) + } + fn flush(&mut self, db: &DB, keys: usize) -> Result<(), Error> { let mut so_far = 0; + let batch = WriteBatch::new(); loop { if so_far == keys { break; } let next = self.write_log.pop_front(); @@ -70,30 +75,34 @@ impl WriteQue { if self.cache.len() > self.cache_len { let key_cache_removed = self.cache.remove(&next); if key_cache_removed.is_some() { - try!(db.put(&next, &key_cache_removed.unwrap())); + try!(batch.put(&next, &key_cache_removed.unwrap())); } else { - try!(db.delete(&next)); + try!(batch.delete(&next)); } } else { let key_persisted = self.cache.get(&next); if key_persisted.is_some() { - try!(db.put(&next, &key_persisted.unwrap())); + try!(batch.put(&next, &key_persisted.unwrap())); } else { - try!(db.delete(&next)); + try!(batch.delete(&next)); } } so_far = so_far + 1; } + db.write(batch); Ok(()) } + + fn is_empty(&self) -> bool { + self.write_log.is_empty() + } } pub struct Database { db: RwLock>, - transactions: RwLock>, iterators: RwLock>, write_que: RwLock, } @@ -102,7 +111,6 @@ impl Database { pub fn new() -> Database { Database { db: RwLock::new(None), - transactions: RwLock::new(BTreeMap::new()), iterators: RwLock::new(BTreeMap::new()), write_que: RwLock::new(WriteQue::new(DEFAULT_CACHE_LEN)), } @@ -117,6 +125,18 @@ impl Database { try!(que.flush(&db, FLUSH_BATCH_SIZE)); Ok(()) } + + pub fn flush_all(&self) -> Result<(), Error> { + let mut que = self.write_que.write().unwrap(); + let db_lock = self.db.read().unwrap(); + if db_lock.is_none() { return Ok(()); } + let db = db_lock.as_ref().unwrap(); + + while !que.is_empty() { + try!(que.flush(&db, FLUSH_BATCH_SIZE)); + } + Ok(()) + } } impl Drop for Database { @@ -153,45 +173,28 @@ impl DatabaseService for Database { } fn close(&self) -> Result<(), Error> { + self.flush_all(); + let mut db = self.db.write().unwrap(); if db.is_none() { return Err(Error::IsClosed); } - // TODO: wait for transactions to expire/close here? - if self.transactions.read().unwrap().len() > 0 { return Err(Error::UncommitedTransactions); } - *db = None; Ok(()) } fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { - let db_lock = self.db.read().unwrap(); - let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); - - try!(db.put(key, value)); + let mut que_lock = self.write_que.write().unwrap(); + que_lock.write(key.to_vec(), value.to_vec()); Ok(()) } fn delete(&self, key: &[u8]) -> Result<(), Error> { - let db_lock = self.db.read().unwrap(); - let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); - - try!(db.delete(key)); - Ok(()) - } - - fn write(&self, handle: TransactionHandle) -> Result<(), Error> { - let db_lock = self.db.read().unwrap(); - let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); - - let mut transactions = self.transactions.write().unwrap(); - let batch = try!( - transactions.remove(&handle).ok_or(Error::TransactionUnknown) - ); - try!(db.write(batch)); + let mut que_lock = self.write_que.write().unwrap(); + que_lock.remove(key.to_vec()); Ok(()) } - fn write_client(&self, transaction: DBClientTransaction) -> Result<(), Error> { + fn write(&self, transaction: DBTransaction) -> Result<(), Error> { let db_lock = self.db.read().unwrap(); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); @@ -207,11 +210,21 @@ impl DatabaseService for Database { } fn get(&self, key: &[u8]) -> Result>, Error> { + { + let key_vec = key.to_vec(); + let cache_hit = self.write_que.read().unwrap().get(&key_vec); + + if cache_hit.is_some() { + return Ok(Some(cache_hit.unwrap())) + } + } let db_lock = self.db.read().unwrap(); let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); match try!(db.get(key)) { - Some(db_vec) => Ok(Some(db_vec.to_vec())), + Some(db_vec) => { + Ok(Some(db_vec.to_vec())) + }, None => Ok(None), } } @@ -266,75 +279,11 @@ impl DatabaseService for Database { iterators.remove(&handle); Ok(()) } - - fn dispose_transaction(&self, handle: TransactionHandle) -> Result<(), Error> { - let mut transactions = self.transactions.write().unwrap(); - transactions.remove(&handle); - Ok(()) - } - - - fn transaction_put(&self, transaction: TransactionHandle, key: &[u8], value: &[u8]) -> Result<(), Error> - { - let mut transactions = self.transactions.write().unwrap(); - let batch = try!( - transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown) - ); - try!(batch.put(&key, &value)); - Ok(()) - } - - fn transaction_delete(&self, transaction: TransactionHandle, key: &[u8]) -> Result<(), Error> { - let mut transactions = self.transactions.write().unwrap(); - let batch = try!( - transactions.get_mut(&transaction).ok_or(Error::TransactionUnknown) - ); - try!(batch.delete(&key)); - Ok(()) - } - - fn new_transaction(&self) -> TransactionHandle { - let mut transactions = self.transactions.write().unwrap(); - let next_transaction = transactions.keys().last().unwrap_or(&0) + 1; - transactions.insert(next_transaction, WriteBatch::new()); - - next_transaction - } } // TODO : put proper at compile-time impl IpcConfig for Database {} -/// Write transaction. Batches a sequence of put/delete operations for efficiency. -pub struct DBTransaction { - client: Arc>, - handle: TransactionHandle, -} - -impl DBTransaction { - /// Create new transaction. - pub fn new(client: &Arc>) -> Result { - let client_ref = client.clone(); - let new_handle = client_ref.new_transaction(); - Ok(DBTransaction { client: client_ref, handle: new_handle }) - } - - /// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write. - pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { - self.client.transaction_put(self.handle, key, value) - } - - /// Delete value by key. - pub fn delete(&self, key: &[u8]) -> Result<(), Error> { - self.client.transaction_delete(self.handle, key) - } - - /// Commits transaction - pub fn commit(self) -> Result<(), Error> { - self.client.write(self.handle) - } -} - /// Database iterator pub struct DatabaseIterator { client: Arc>, @@ -384,6 +333,7 @@ mod test { db.open_default(path.as_str().to_owned()).unwrap(); db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); + db.flush_all(); assert!(!db.is_empty().unwrap()); } @@ -402,7 +352,7 @@ mod test { #[cfg(test)] mod client_tests { - use super::{DatabaseClient, Database, DBTransaction}; + use super::{DatabaseClient, Database}; use traits::*; use devtools::*; use nanoipc; @@ -523,9 +473,10 @@ mod client_tests { }); } + #[test] - fn can_create_transaction() { - let url = "ipc:///tmp/parity-db-ipc-test-50.ipc"; + fn can_commit_client_transaction() { + let url = "ipc:///tmp/parity-db-ipc-test-60.ipc"; let path = RandomTempPath::create_dir(); crossbeam::scope(move |scope| { @@ -534,9 +485,9 @@ mod client_tests { let client = nanoipc::init_client::>(url).unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); - let transaction = DBTransaction::new(&client.service()).unwrap(); - transaction.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); - transaction.commit().unwrap(); + let transaction = DBTransaction::new(); + transaction.put("xxx".as_bytes(), "1".as_bytes()); + client.write(transaction).unwrap(); client.close().unwrap(); @@ -548,26 +499,34 @@ mod client_tests { } #[test] - fn can_commit_client_transaction() { - let url = "ipc:///tmp/parity-db-ipc-test-60.ipc"; + fn key_write_read_ipc() { + let url = "ipc:///tmp/parity-db-ipc-test-70.ipc"; let path = RandomTempPath::create_dir(); - crossbeam::scope(move |scope| { - let stop = Arc::new(AtomicBool::new(false)); - run_worker(scope, stop.clone(), url); + crossbeam::scope(|scope| { + let stop = StopGuard::new(); + run_worker(&scope, stop.share(), url); + let client = nanoipc::init_client::>(url).unwrap(); - client.open_default(path.as_str().to_owned()).unwrap(); - let transaction = DBClientTransaction::new(); - transaction.put("xxx".as_bytes(), "1".as_bytes()); - client.write_client(transaction).unwrap(); + client.open_default(path.as_str().to_owned()).unwrap(); + let mut batch = Vec::new(); + for _ in 0..100 { + batch.push((random_str(256).as_bytes().to_vec(), random_str(256).as_bytes().to_vec())); + batch.push((random_str(256).as_bytes().to_vec(), random_str(2048).as_bytes().to_vec())); + batch.push((random_str(2048).as_bytes().to_vec(), random_str(2048).as_bytes().to_vec())); + batch.push((random_str(2048).as_bytes().to_vec(), random_str(256).as_bytes().to_vec())); + } + for &(ref k, ref v) in batch.iter() { + client.put(k, v).unwrap(); + } client.close().unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); - assert_eq!(client.get("xxx".as_bytes()).unwrap().unwrap(), "1".as_bytes().to_vec()); - - stop.store(true, Ordering::Relaxed); + for &(ref k, ref v) in batch.iter() { + assert_eq!(v, &client.get(k).unwrap().unwrap()); + } }); } } diff --git a/db/src/lib.rs.in b/db/src/lib.rs.in index d0c56cedd65..eb57e4f3200 100644 --- a/db/src/lib.rs.in +++ b/db/src/lib.rs.in @@ -25,8 +25,8 @@ extern crate crossbeam; pub mod database; pub mod traits; -pub use traits::{DatabaseService, DBClientTransaction, Error}; -pub use database::{Database, DatabaseClient, DBTransaction, DatabaseIterator}; +pub use traits::{DatabaseService, DBTransaction, Error}; +pub use database::{Database, DatabaseClient, DatabaseIterator}; use std::sync::Arc; use std::sync::atomic::*; diff --git a/db/src/traits.rs b/db/src/traits.rs index 4ed85d6747a..d1fec666df1 100644 --- a/db/src/traits.rs +++ b/db/src/traits.rs @@ -5,7 +5,6 @@ use ipc::binary::BinaryConvertError; use std::collections::VecDeque; use std::cell::RefCell; -pub type TransactionHandle = u32; pub type IteratorHandle = u32; pub const DEFAULT_CACHE_LEN: usize = 20480; @@ -69,18 +68,6 @@ pub trait DatabaseService { /// Delete value by key. fn delete(&self, key: &[u8]) -> Result<(), Error>; - /// Insert a key-value pair in the transaction. Any existing value value will be overwritten. - fn transaction_put(&self, transaction: TransactionHandle, key: &[u8], value: &[u8]) -> Result<(), Error>; - - /// Delete value by key using transaction - fn transaction_delete(&self, transaction: TransactionHandle, key: &[u8]) -> Result<(), Error>; - - /// Commit transaction to database. - fn write(&self, tr: TransactionHandle) -> Result<(), Error>; - - /// Initiate new transaction on database - fn new_transaction(&self) -> TransactionHandle; - /// Get value by key. fn get(&self, key: &[u8]) -> Result>, Error>; @@ -96,22 +83,22 @@ pub trait DatabaseService { /// Next key-value for the the given iterator fn iter_next(&self, iterator: IteratorHandle) -> Option; + /// Dispose iteration that is no longer needed fn dispose_iter(&self, handle: IteratorHandle) -> Result<(), Error>; - fn dispose_transaction(&self, handle: TransactionHandle) -> Result<(), Error>; - - fn write_client(&self, transaction: DBClientTransaction) -> Result<(), Error>; + /// Write client transaction + fn write(&self, transaction: DBTransaction) -> Result<(), Error>; } #[derive(Binary)] -pub struct DBClientTransaction { +pub struct DBTransaction { pub writes: RefCell>, pub removes: RefCell>>, } -impl DBClientTransaction { - pub fn new() -> DBClientTransaction { - DBClientTransaction { +impl DBTransaction { + pub fn new() -> DBTransaction { + DBTransaction { writes: RefCell::new(Vec::new()), removes: RefCell::new(Vec::new()), } From d77236406d0cdaaabb8db666a43f4f65cddb3502 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Thu, 26 May 2016 18:36:04 +0200 Subject: [PATCH 05/13] transaction writes also qued --- db/src/database.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/db/src/database.rs b/db/src/database.rs index 130992d04f5..2dc912660e8 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -195,17 +195,17 @@ impl DatabaseService for Database { } fn write(&self, transaction: DBTransaction) -> Result<(), Error> { - let db_lock = self.db.read().unwrap(); - let db = try!(db_lock.as_ref().ok_or(Error::IsClosed)); + let mut que_lock = self.write_que.write().unwrap(); - let batch = WriteBatch::new(); - for ref kv in transaction.writes.borrow().iter() { - try!(batch.put(&kv.key, &kv.value)) + let mut writes = transaction.writes.borrow_mut(); + for kv in writes.drain(..) { + que_lock.write(kv.key, kv.value); } - for ref k in transaction.removes.borrow().iter() { - try!(batch.delete(k)); + + let mut removes = transaction.removes.borrow_mut(); + for k in removes.drain(..) { + que_lock.remove(k); } - try!(db.write(batch)); Ok(()) } From a3af78d0f7163dd7ed19359ce03f9f4d81338c14 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Thu, 26 May 2016 19:09:53 +0200 Subject: [PATCH 06/13] get rid of server-side transactions --- ethcore/src/blockchain/blockchain.rs | 10 +++++----- ethcore/src/db.rs | 9 --------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index a06b38c538d..d33e7f6259c 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -316,12 +316,12 @@ impl BlockChain { bc.blocks_db.put(&hash, genesis).unwrap(); - let batch = ethcore_db::DBTransaction::new(&bc.extras_db.service()).unwrap(); + let batch = ethcore_db::DBTransaction::new(); batch.write(&hash, &details); batch.write(&header.number(), &hash); - batch.put(b"best", &hash).unwrap(); - batch.commit().unwrap(); + batch.put(b"best", &hash); + bc.extras_db.write(batch).unwrap(); hash } }; @@ -464,7 +464,7 @@ impl BlockChain { /// Applies extras update. fn apply_update(&self, update: ExtrasUpdate) { - let batch = ethcore_db::DBClientTransaction::new(); + let batch = ethcore_db::DBTransaction::new(); batch.put(b"best", &update.info.hash); { @@ -508,7 +508,7 @@ impl BlockChain { batch.extend_with_cache(&mut write_txs, update.transactions_addresses, CacheUpdatePolicy::Remove); // update extras database - self.extras_db.write_client(batch).unwrap(); + self.extras_db.write(batch).unwrap(); } } diff --git a/ethcore/src/db.rs b/ethcore/src/db.rs index a00e5f9fe10..861e50299c3 100644 --- a/ethcore/src/db.rs +++ b/ethcore/src/db.rs @@ -159,15 +159,6 @@ impl Readable for Database { } impl Writable for ethcore_db::DBTransaction { - fn write(&self, key: &Key, value: &T) where T: Encodable, R: Deref { - let result = self.put(&key.key(), &encode(value)); - if let Err(err) = result { - panic!("db put failed, key: {:?}, err: {:?}", &key.key() as &[u8], err); - } - } -} - -impl Writable for ethcore_db::DBClientTransaction { fn write(&self, key: &Key, value: &T) where T: Encodable, R: Deref { self.put(&key.key(), &encode(value)); } From ce5b90845d1283f695f0f3f07a56762e0d6d01cf Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Thu, 26 May 2016 21:44:17 +0200 Subject: [PATCH 07/13] removes/deletes split --- db/src/database.rs | 85 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 22 deletions(-) diff --git a/db/src/database.rs b/db/src/database.rs index 2dc912660e8..523077db258 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -25,7 +25,7 @@ use std::convert::From; use ipc::IpcConfig; use std::mem; use ipc::binary::BinaryConvertError; -use std::collections::{VecDeque, HashMap}; +use std::collections::{VecDeque, HashMap, HashSet}; impl From for Error { fn from(s: String) -> Error { @@ -33,9 +33,19 @@ impl From for Error { } } +enum LogEntryKind { + Write, + Remove, +} + +struct WriteLogEntry { + kind: LogEntryKind, + key: Vec, +} + pub struct WriteQue { cache: HashMap, Vec>, - write_log: VecDeque>, + write_log: VecDeque, cache_len: usize, } @@ -52,12 +62,12 @@ impl WriteQue { fn write(&mut self, key: Vec, val: Vec) { self.cache.insert(key.clone(), val); - self.write_log.push_back(key); + self.write_log.push_back(WriteLogEntry { key: key, kind: LogEntryKind::Write }); } fn remove(&mut self, key: Vec) { self.cache.remove(&key); - self.write_log.push_back(key); + self.write_log.push_back(WriteLogEntry { key: key, kind: LogEntryKind::Remove }); } fn get(&self, key: &Vec) -> Option> { @@ -67,32 +77,63 @@ impl WriteQue { fn flush(&mut self, db: &DB, keys: usize) -> Result<(), Error> { let mut so_far = 0; let batch = WriteBatch::new(); + let mut effective_removes: HashSet> = HashSet::new(); + let mut effective_writes: HashSet> = HashSet::new(); loop { if so_far == keys { break; } let next = self.write_log.pop_front(); if next.is_none() { break; } let next = next.unwrap(); - if self.cache.len() > self.cache_len { - let key_cache_removed = self.cache.remove(&next); - if key_cache_removed.is_some() { - try!(batch.put(&next, &key_cache_removed.unwrap())); - } - else { - try!(batch.delete(&next)); + + match next.kind { + LogEntryKind::Write => { + effective_removes.remove(&next.key); + effective_writes.insert(next.key); + }, + LogEntryKind::Remove => { + effective_writes.remove(&next.key); + effective_removes.insert(next.key); } } + +// if self.cache.len() > self.cache_len { +// let key_cache_removed = self.cache.remove(&next); +// if key_cache_removed.is_some() { +// try!(batch.put(&next, &key_cache_removed.unwrap())); +// } +// else { +// try!(batch.delete(&next)); +// } +// } +// else { +// let key_persisted = self.cache.get(&next); +// if key_persisted.is_some() { +// try!(batch.put(&next, &key_persisted.unwrap())); +// } +// else { +// try!(batch.delete(&next)); +// } +// } + so_far = so_far + 1; + } + + for key in effective_writes.drain() { + if self.cache.len() > self.cache_len { + let key_cache_removed = self.cache.remove(&key); + try!(batch.put(&key, &key_cache_removed.unwrap())); + } else { - let key_persisted = self.cache.get(&next); - if key_persisted.is_some() { - try!(batch.put(&next, &key_persisted.unwrap())); - } - else { - try!(batch.delete(&next)); - } + let key_persisted = self.cache.get(&key); + try!(batch.put(&key, &key_persisted.unwrap())); } - so_far = so_far + 1; } - db.write(batch); + + for key in effective_removes.drain() { + self.cache.remove(&key); + try!(batch.delete(&key)); + } + + try!(db.write(batch)); Ok(()) } @@ -173,7 +214,7 @@ impl DatabaseService for Database { } fn close(&self) -> Result<(), Error> { - self.flush_all(); + try!(self.flush_all()); let mut db = self.db.write().unwrap(); if db.is_none() { return Err(Error::IsClosed); } @@ -333,7 +374,7 @@ mod test { db.open_default(path.as_str().to_owned()).unwrap(); db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); - db.flush_all(); + db.flush_all().unwrap(); assert!(!db.is_empty().unwrap()); } From d1c8bf5c9066f96a8867163cc6866bc02211e383 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Thu, 26 May 2016 21:50:56 +0200 Subject: [PATCH 08/13] remove comments --- db/src/database.rs | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/db/src/database.rs b/db/src/database.rs index 523077db258..e80ecc2f173 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -95,25 +95,6 @@ impl WriteQue { effective_removes.insert(next.key); } } - -// if self.cache.len() > self.cache_len { -// let key_cache_removed = self.cache.remove(&next); -// if key_cache_removed.is_some() { -// try!(batch.put(&next, &key_cache_removed.unwrap())); -// } -// else { -// try!(batch.delete(&next)); -// } -// } -// else { -// let key_persisted = self.cache.get(&next); -// if key_persisted.is_some() { -// try!(batch.put(&next, &key_persisted.unwrap())); -// } -// else { -// try!(batch.delete(&next)); -// } -// } so_far = so_far + 1; } From dab7598c669e0a4c1fe7f52b45d0b1a9aea402a1 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Fri, 27 May 2016 13:17:55 +0200 Subject: [PATCH 09/13] write que flush workrers --- Cargo.lock | 20 ++++++++++++++++++++ Cargo.toml | 2 ++ db/src/database.rs | 10 +++++++--- parity/db/main.rs | 47 +++++++++++++++++++++++++++++++++++++--------- 4 files changed, 67 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 07ecf62864e..2a8d4e13367 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,6 +30,8 @@ dependencies = [ "rpassword 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "schedule_recv 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "scoped_threadpool 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "syntex 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", @@ -656,6 +658,11 @@ name = "lazy_static" version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "lazy_static" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "libc" version = "0.1.12" @@ -1081,6 +1088,19 @@ dependencies = [ "semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "schedule_recv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "scoped_threadpool" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "semver" version = "0.1.20" diff --git a/Cargo.toml b/Cargo.toml index 081cf170e1f..4f8004ca1a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,8 @@ ethcore-db = { path = "db" } ethcore-ipc-hypervisor = { path = "ipc/hypervisor" } crossbeam = "0.2" ansi_term = "0.7" +schedule_recv = "0.1" +scoped_threadpool = "0.1" [dependencies.hyper] version = "0.8" diff --git a/db/src/database.rs b/db/src/database.rs index e80ecc2f173..c645eaa22e3 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -93,7 +93,7 @@ impl WriteQue { LogEntryKind::Remove => { effective_writes.remove(&next.key); effective_removes.insert(next.key); - } + }, } so_far = so_far + 1; } @@ -125,15 +125,19 @@ impl WriteQue { pub struct Database { db: RwLock>, - iterators: RwLock>, + /// Iterators - dont't use between threads! + iterators: RwLock>, write_que: RwLock, } +unsafe impl Send for Database {} +unsafe impl Sync for Database {} + impl Database { pub fn new() -> Database { Database { db: RwLock::new(None), - iterators: RwLock::new(BTreeMap::new()), + iterators: RwLock::new(HashMap::new()), write_que: RwLock::new(WriteQue::new(DEFAULT_CACHE_LEN)), } } diff --git a/parity/db/main.rs b/parity/db/main.rs index 1b8e2087c26..ab834f427e6 100644 --- a/parity/db/main.rs +++ b/parity/db/main.rs @@ -24,13 +24,18 @@ extern crate ethcore_ipc_hypervisor as hypervisor; extern crate ctrlc; extern crate ethcore_devtools as devtools; #[macro_use] extern crate log; +extern crate schedule_recv; +extern crate scoped_threadpool; +use scoped_threadpool::Pool; use db::database::Database; use docopt::Docopt; use std::sync::Arc; use hypervisor::{HypervisorServiceClient, BLOCKCHAIN_MODULE_ID, HYPERVISOR_IPC_URL}; use ctrlc::CtrlC; use std::sync::atomic::*; +use schedule_recv::periodic; +use std::time::Duration; const USAGE: &'static str = " Ethcore database service @@ -44,8 +49,8 @@ struct Args { arg_path: String, } -fn init_worker(addr: &str) -> nanoipc::Worker { - let mut worker = nanoipc::Worker::::new(&Arc::new(Database::new())); +fn init_worker(addr: &str, service: Arc) -> nanoipc::Worker { + let mut worker = nanoipc::Worker::::new(&service); worker.add_reqrep(addr).unwrap(); worker } @@ -56,22 +61,32 @@ fn main() { .unwrap_or_else(|e| e.exit()); info!("Database: {}", args.arg_path); - let blocks_url = db::blocks_service_url(&args.arg_path).unwrap(); + info!("Blocks service url: {}", blocks_url); let extras_url = db::extras_service_url(&args.arg_path).unwrap(); + info!("Extras service url: {}", extras_url); let stop = Arc::new(AtomicBool::new(false)); - let extras_stop = stop.clone(); - let main_stop = stop.clone(); + let extras_service = Arc::new(Database::new()); + let extras_service_worker = extras_service.clone(); + let extras_stop = stop.clone(); std::thread::spawn(move || { - let mut extras_db_worker = init_worker(&extras_url); + let mut extras_db_worker = init_worker(&extras_url, extras_service_worker); while !extras_stop.load(Ordering::Relaxed) { extras_db_worker.poll(); } }); - let mut blocks_db_worker = init_worker(&blocks_url); + let blocks_service = Arc::new(Database::new()); + let blocks_service_worker = blocks_service.clone(); + let blocks_stop = stop.clone(); + std::thread::spawn(move || { + let mut blocks_db_worker = init_worker(&blocks_url, blocks_service_worker); + while !blocks_stop.load(Ordering::Relaxed) { + blocks_db_worker.poll(); + } + }); let hypervisor_client = nanoipc::init_client::>(HYPERVISOR_IPC_URL).unwrap(); hypervisor_client.handshake().unwrap(); @@ -82,7 +97,21 @@ fn main() { stop.store(true, Ordering::Relaxed); }); - while !main_stop.load(Ordering::Relaxed) { - blocks_db_worker.poll(); + let mut thread_pool = Pool::new(3); + let tick = periodic(Duration::from_millis(3000)); + loop { +// std ::thread::sleep(std::time::Duration::from_millis(5000)); + tick.recv().unwrap(); + thread_pool.scoped(|scope| { + let blocks_service_ref = blocks_service.clone(); + let extras_service_ref = extras_service.clone(); + scope.execute(move || { + blocks_service_ref.flush().unwrap(); + }); + scope.execute(move || { + extras_service_ref.flush().unwrap(); + }) + }); + println!("tick"); } } From 9a490e707cb6ca49d52b0d01d7e9e701f4e873f9 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Fri, 27 May 2016 15:27:39 +0200 Subject: [PATCH 10/13] write que partial flush removed --- db/src/database.rs | 50 +++++++++++++++++++++++++++------------------- parity/db/main.rs | 7 +++++++ 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/db/src/database.rs b/db/src/database.rs index c645eaa22e3..20eb35ddc82 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -19,7 +19,6 @@ use traits::*; use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBIterator, IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction}; -use std::collections::BTreeMap; use std::sync::{RwLock, Arc}; use std::convert::From; use ipc::IpcConfig; @@ -74,13 +73,11 @@ impl WriteQue { self.cache.get(key).and_then(|vec_ref| Some(vec_ref.clone())) } - fn flush(&mut self, db: &DB, keys: usize) -> Result<(), Error> { - let mut so_far = 0; + fn flush(&mut self, db: &DB) -> Result<(), Error> { let batch = WriteBatch::new(); let mut effective_removes: HashSet> = HashSet::new(); let mut effective_writes: HashSet> = HashSet::new(); loop { - if so_far == keys { break; } let next = self.write_log.pop_front(); if next.is_none() { break; } let next = next.unwrap(); @@ -95,12 +92,15 @@ impl WriteQue { effective_removes.insert(next.key); }, } - so_far = so_far + 1; } for key in effective_writes.drain() { if self.cache.len() > self.cache_len { let key_cache_removed = self.cache.remove(&key); + + // it was already updated with the most recent value + if key_cache_removed.is_none() { continue; } + try!(batch.put(&key, &key_cache_removed.unwrap())); } else { @@ -148,19 +148,7 @@ impl Database { if db_lock.is_none() { return Ok(()); } let db = db_lock.as_ref().unwrap(); - try!(que.flush(&db, FLUSH_BATCH_SIZE)); - Ok(()) - } - - pub fn flush_all(&self) -> Result<(), Error> { - let mut que = self.write_que.write().unwrap(); - let db_lock = self.db.read().unwrap(); - if db_lock.is_none() { return Ok(()); } - let db = db_lock.as_ref().unwrap(); - - while !que.is_empty() { - try!(que.flush(&db, FLUSH_BATCH_SIZE)); - } + try!(que.flush(&db)); Ok(()) } } @@ -199,7 +187,7 @@ impl DatabaseService for Database { } fn close(&self) -> Result<(), Error> { - try!(self.flush_all()); + try!(self.flush()); let mut db = self.db.write().unwrap(); if db.is_none() { return Err(Error::IsClosed); } @@ -359,7 +347,7 @@ mod test { db.open_default(path.as_str().to_owned()).unwrap(); db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); - db.flush_all().unwrap(); + db.flush().unwrap(); assert!(!db.is_empty().unwrap()); } @@ -376,6 +364,28 @@ mod test { } } +#[cfg(test)] +mod write_que_tests { + use super::Database; + use traits::*; + use devtools::*; + + #[test] + fn que_write_flush() { + let db = Database::new(); + let path = RandomTempPath::create_dir(); + + db.open_default(path.as_str().to_owned()).unwrap(); + db.put("100500".as_bytes(), "1".as_bytes()).unwrap(); + db.delete("100500".as_bytes()).unwrap(); + db.flush().unwrap(); + + let val = db.get("100500".as_bytes()).unwrap(); + assert!(val.is_none()); + } + +} + #[cfg(test)] mod client_tests { use super::{DatabaseClient, Database}; diff --git a/parity/db/main.rs b/parity/db/main.rs index ab834f427e6..5396dd1921a 100644 --- a/parity/db/main.rs +++ b/parity/db/main.rs @@ -92,9 +92,16 @@ fn main() { hypervisor_client.handshake().unwrap(); hypervisor_client.module_ready(BLOCKCHAIN_MODULE_ID); + + let blocks_service_term = blocks_service.clone(); + let extras_service_term = extras_service.clone(); CtrlC::set_handler(move || { std::thread::sleep(std::time::Duration::new(1, 0)); stop.store(true, Ordering::Relaxed); + println!("flushing writes..."); + + blocks_service_term.flush().unwrap(); + extras_service_term.flush().unwrap(); }); let mut thread_pool = Pool::new(3); From 3efa53cb1ebf5ce358601e66e0d1c58eb66727e3 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Fri, 27 May 2016 16:58:16 +0200 Subject: [PATCH 11/13] writeque with hashmap only --- db/src/database.rs | 118 +++++++++++++++++++++++---------------------- parity/db/main.rs | 8 ++- 2 files changed, 63 insertions(+), 63 deletions(-) diff --git a/db/src/database.rs b/db/src/database.rs index 20eb35ddc82..30186dd27e1 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -32,20 +32,14 @@ impl From for Error { } } -enum LogEntryKind { - Write, +enum WriteCacheEntry { Remove, -} - -struct WriteLogEntry { - kind: LogEntryKind, - key: Vec, + Write(Vec), } pub struct WriteQue { - cache: HashMap, Vec>, - write_log: VecDeque, - cache_len: usize, + cache: HashMap, WriteCacheEntry>, + preferred_len: usize, } const FLUSH_BATCH_SIZE: usize = 1048; @@ -54,72 +48,69 @@ impl WriteQue { fn new(cache_len: usize) -> WriteQue { WriteQue { cache: HashMap::new(), - write_log: VecDeque::new(), - cache_len: cache_len, + preferred_len: cache_len, } } fn write(&mut self, key: Vec, val: Vec) { - self.cache.insert(key.clone(), val); - self.write_log.push_back(WriteLogEntry { key: key, kind: LogEntryKind::Write }); + self.cache.entry(key).or_insert(WriteCacheEntry::Write(val)); } fn remove(&mut self, key: Vec) { - self.cache.remove(&key); - self.write_log.push_back(WriteLogEntry { key: key, kind: LogEntryKind::Remove }); + self.cache.entry(key).or_insert(WriteCacheEntry::Remove); } fn get(&self, key: &Vec) -> Option> { - self.cache.get(key).and_then(|vec_ref| Some(vec_ref.clone())) + self.cache.get(key).and_then( + |vec_ref| match vec_ref { + &WriteCacheEntry::Write(ref val) => Some(val.clone()), + &WriteCacheEntry::Remove => None + }) } - fn flush(&mut self, db: &DB) -> Result<(), Error> { + /// WriteQue should be locked for this + fn flush(&mut self, db: &DB, amount: usize) -> Result<(), Error> { let batch = WriteBatch::new(); - let mut effective_removes: HashSet> = HashSet::new(); - let mut effective_writes: HashSet> = HashSet::new(); - loop { - let next = self.write_log.pop_front(); - if next.is_none() { break; } - let next = next.unwrap(); - - match next.kind { - LogEntryKind::Write => { - effective_removes.remove(&next.key); - effective_writes.insert(next.key); - }, - LogEntryKind::Remove => { - effective_writes.remove(&next.key); - effective_removes.insert(next.key); - }, - } - } - - for key in effective_writes.drain() { - if self.cache.len() > self.cache_len { - let key_cache_removed = self.cache.remove(&key); - - // it was already updated with the most recent value - if key_cache_removed.is_none() { continue; } - - try!(batch.put(&key, &key_cache_removed.unwrap())); - } - else { - let key_persisted = self.cache.get(&key); - try!(batch.put(&key, &key_persisted.unwrap())); - } - } - - for key in effective_removes.drain() { - self.cache.remove(&key); - try!(batch.delete(&key)); + let mut removed_so_far = 0; + while removed_so_far < amount { + if self.cache.len() == 0 { break; } + let removed_key = { + let (key, cache_entry) = self.cache.iter().nth(0).unwrap(); + + match *cache_entry { + WriteCacheEntry::Write(ref val) => { + try!(batch.put(&key, val)); + }, + WriteCacheEntry::Remove => { + try!(batch.delete(&key)); + }, + } + key.clone() + }; + + self.cache.remove(&removed_key); + + removed_so_far = removed_so_far + 1; } + if removed_so_far > 0 { try!(db.write(batch)); } + Ok(()) + } - try!(db.write(batch)); + /// flushes until que is empty + fn flush_all(&mut self, db: &DB) -> Result<(), Error> { + while !self.is_empty() { try!(self.flush(db, FLUSH_BATCH_SIZE)); } Ok(()) } fn is_empty(&self) -> bool { - self.write_log.is_empty() + self.cache.is_empty() + } + + fn try_shrink(&mut self, db: &DB) -> Result<(), Error> { + if self.cache.len() > self.preferred_len { + try!(self.flush(db, FLUSH_BATCH_SIZE)); + } + Ok(()) } } @@ -148,9 +139,20 @@ impl Database { if db_lock.is_none() { return Ok(()); } let db = db_lock.as_ref().unwrap(); - try!(que.flush(&db)); + try!(que.try_shrink(&db)); Ok(()) } + + pub fn flush_all(&self) -> Result<(), Error> { + let mut que = self.write_que.write().unwrap(); + let db_lock = self.db.read().unwrap(); + if db_lock.is_none() { return Ok(()); } + let db = db_lock.as_ref().unwrap(); + + try!(que.flush_all(&db)); + Ok(()) + + } } impl Drop for Database { diff --git a/parity/db/main.rs b/parity/db/main.rs index 5396dd1921a..afd0cb21d69 100644 --- a/parity/db/main.rs +++ b/parity/db/main.rs @@ -98,16 +98,15 @@ fn main() { CtrlC::set_handler(move || { std::thread::sleep(std::time::Duration::new(1, 0)); stop.store(true, Ordering::Relaxed); - println!("flushing writes..."); + info!("Flushing writes..."); - blocks_service_term.flush().unwrap(); - extras_service_term.flush().unwrap(); + blocks_service_term.flush_all().unwrap(); + extras_service_term.flush_all().unwrap(); }); let mut thread_pool = Pool::new(3); let tick = periodic(Duration::from_millis(3000)); loop { -// std ::thread::sleep(std::time::Duration::from_millis(5000)); tick.recv().unwrap(); thread_pool.scoped(|scope| { let blocks_service_ref = blocks_service.clone(); @@ -119,6 +118,5 @@ fn main() { extras_service_ref.flush().unwrap(); }) }); - println!("tick"); } } From 570a8997e943983419e6e6394466ef0800f5595b Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Fri, 27 May 2016 17:31:20 +0200 Subject: [PATCH 12/13] fix warnings and que->queue --- db/src/database.rs | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/db/src/database.rs b/db/src/database.rs index 30186dd27e1..b0fdd526bc0 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -24,7 +24,7 @@ use std::convert::From; use ipc::IpcConfig; use std::mem; use ipc::binary::BinaryConvertError; -use std::collections::{VecDeque, HashMap, HashSet}; +use std::collections::{VecDeque, HashMap}; impl From for Error { fn from(s: String) -> Error { @@ -37,16 +37,16 @@ enum WriteCacheEntry { Write(Vec), } -pub struct WriteQue { +pub struct WriteQueue { cache: HashMap, WriteCacheEntry>, preferred_len: usize, } const FLUSH_BATCH_SIZE: usize = 1048; -impl WriteQue { - fn new(cache_len: usize) -> WriteQue { - WriteQue { +impl WriteQueue { + fn new(cache_len: usize) -> WriteQueue { + WriteQueue { cache: HashMap::new(), preferred_len: cache_len, } @@ -118,7 +118,7 @@ pub struct Database { db: RwLock>, /// Iterators - dont't use between threads! iterators: RwLock>, - write_que: RwLock, + write_queue: RwLock, } unsafe impl Send for Database {} @@ -129,27 +129,27 @@ impl Database { Database { db: RwLock::new(None), iterators: RwLock::new(HashMap::new()), - write_que: RwLock::new(WriteQue::new(DEFAULT_CACHE_LEN)), + write_queue: RwLock::new(WriteQueue::new(DEFAULT_CACHE_LEN)), } } pub fn flush(&self) -> Result<(), Error> { - let mut que = self.write_que.write().unwrap(); + let mut queue = self.write_queue.write().unwrap(); let db_lock = self.db.read().unwrap(); if db_lock.is_none() { return Ok(()); } let db = db_lock.as_ref().unwrap(); - try!(que.try_shrink(&db)); + try!(queue.try_shrink(&db)); Ok(()) } pub fn flush_all(&self) -> Result<(), Error> { - let mut que = self.write_que.write().unwrap(); + let mut queue = self.write_queue.write().unwrap(); let db_lock = self.db.read().unwrap(); if db_lock.is_none() { return Ok(()); } let db = db_lock.as_ref().unwrap(); - try!(que.flush_all(&db)); + try!(queue.flush_all(&db)); Ok(()) } @@ -189,7 +189,7 @@ impl DatabaseService for Database { } fn close(&self) -> Result<(), Error> { - try!(self.flush()); + try!(self.flush_all()); let mut db = self.db.write().unwrap(); if db.is_none() { return Err(Error::IsClosed); } @@ -199,28 +199,28 @@ impl DatabaseService for Database { } fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { - let mut que_lock = self.write_que.write().unwrap(); - que_lock.write(key.to_vec(), value.to_vec()); + let mut queue_lock = self.write_queue.write().unwrap(); + queue_lock.write(key.to_vec(), value.to_vec()); Ok(()) } fn delete(&self, key: &[u8]) -> Result<(), Error> { - let mut que_lock = self.write_que.write().unwrap(); - que_lock.remove(key.to_vec()); + let mut queue_lock = self.write_queue.write().unwrap(); + queue_lock.remove(key.to_vec()); Ok(()) } fn write(&self, transaction: DBTransaction) -> Result<(), Error> { - let mut que_lock = self.write_que.write().unwrap(); + let mut queue_lock = self.write_queue.write().unwrap(); let mut writes = transaction.writes.borrow_mut(); for kv in writes.drain(..) { - que_lock.write(kv.key, kv.value); + queue_lock.write(kv.key, kv.value); } let mut removes = transaction.removes.borrow_mut(); for k in removes.drain(..) { - que_lock.remove(k); + queue_lock.remove(k); } Ok(()) } @@ -228,7 +228,7 @@ impl DatabaseService for Database { fn get(&self, key: &[u8]) -> Result>, Error> { { let key_vec = key.to_vec(); - let cache_hit = self.write_que.read().unwrap().get(&key_vec); + let cache_hit = self.write_queue.read().unwrap().get(&key_vec); if cache_hit.is_some() { return Ok(Some(cache_hit.unwrap())) From 0faa687e1e327d684a6e5f7273ee2c122e7a8f87 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Fri, 27 May 2016 19:54:41 +0200 Subject: [PATCH 13/13] fix flush & increase cache/flush ratio --- db/src/database.rs | 14 ++++++++------ db/src/traits.rs | 2 +- parity/db/main.rs | 6 +++--- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/db/src/database.rs b/db/src/database.rs index b0fdd526bc0..bd7567d4828 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -42,7 +42,7 @@ pub struct WriteQueue { preferred_len: usize, } -const FLUSH_BATCH_SIZE: usize = 1048; +const FLUSH_BATCH_SIZE: usize = 4096; impl WriteQueue { fn new(cache_len: usize) -> WriteQueue { @@ -53,11 +53,11 @@ impl WriteQueue { } fn write(&mut self, key: Vec, val: Vec) { - self.cache.entry(key).or_insert(WriteCacheEntry::Write(val)); + self.cache.insert(key, WriteCacheEntry::Write(val)); } fn remove(&mut self, key: Vec) { - self.cache.entry(key).or_insert(WriteCacheEntry::Remove); + self.cache.insert(key, WriteCacheEntry::Remove); } fn get(&self, key: &Vec) -> Option> { @@ -92,7 +92,9 @@ impl WriteQueue { removed_so_far = removed_so_far + 1; } - if removed_so_far > 0 { try!(db.write(batch)); } + if removed_so_far > 0 { + try!(db.write(batch)); + } Ok(()) } @@ -349,7 +351,7 @@ mod test { db.open_default(path.as_str().to_owned()).unwrap(); db.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); - db.flush().unwrap(); + db.flush_all().unwrap(); assert!(!db.is_empty().unwrap()); } @@ -380,7 +382,7 @@ mod write_que_tests { db.open_default(path.as_str().to_owned()).unwrap(); db.put("100500".as_bytes(), "1".as_bytes()).unwrap(); db.delete("100500".as_bytes()).unwrap(); - db.flush().unwrap(); + db.flush_all().unwrap(); let val = db.get("100500".as_bytes()).unwrap(); assert!(val.is_none()); diff --git a/db/src/traits.rs b/db/src/traits.rs index d1fec666df1..13228bc1357 100644 --- a/db/src/traits.rs +++ b/db/src/traits.rs @@ -7,7 +7,7 @@ use std::cell::RefCell; pub type IteratorHandle = u32; -pub const DEFAULT_CACHE_LEN: usize = 20480; +pub const DEFAULT_CACHE_LEN: usize = 12288; #[derive(Binary)] pub struct KeyValue { diff --git a/parity/db/main.rs b/parity/db/main.rs index afd0cb21d69..1fb174739e1 100644 --- a/parity/db/main.rs +++ b/parity/db/main.rs @@ -96,12 +96,12 @@ fn main() { let blocks_service_term = blocks_service.clone(); let extras_service_term = extras_service.clone(); CtrlC::set_handler(move || { - std::thread::sleep(std::time::Duration::new(1, 0)); - stop.store(true, Ordering::Relaxed); info!("Flushing writes..."); - blocks_service_term.flush_all().unwrap(); extras_service_term.flush_all().unwrap(); + + std::thread::sleep(std::time::Duration::new(1, 0)); + stop.store(true, Ordering::Relaxed); }); let mut thread_pool = Pool::new(3);