Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

cache manager and clearing tracing cache #1769

Merged
merged 7 commits into from
Jul 30, 2016
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 32 additions & 83 deletions ethcore/src/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

//! Blockchain database.

use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrder};
use bloomchain as bc;
use util::*;
use header::*;
Expand All @@ -32,6 +31,7 @@ use blockchain::update::ExtrasUpdate;
use blockchain::{CacheSize, ImportRoute, Config};
use db::{Writable, Readable, CacheUpdatePolicy};
use client::{DB_COL_EXTRA, DB_COL_HEADERS, DB_COL_BODIES};
use cache_manager::CacheManager;

const LOG_BLOOMS_LEVELS: usize = 3;
const LOG_BLOOMS_ELEMENTS_PER_INDEX: usize = 16;
Expand Down Expand Up @@ -130,11 +130,6 @@ enum CacheID {
BlockReceipts(H256),
}

struct CacheManager {
cache_usage: VecDeque<HashSet<CacheID>>,
in_use: HashSet<CacheID>,
}

impl bc::group::BloomGroupDatabase for BlockChain {
fn blooms_at(&self, position: &bc::group::GroupPosition) -> Option<bc::group::BloomGroup> {
let position = LogGroupPosition::from(position.clone());
Expand All @@ -148,8 +143,6 @@ impl bc::group::BloomGroupDatabase for BlockChain {
/// **Does not do input data verification.**
pub struct BlockChain {
// All locks must be captured in the order declared here.
pref_cache_size: AtomicUsize,
max_cache_size: AtomicUsize,
blooms_config: bc::Config,

best_block: RwLock<BestBlock>,
Expand All @@ -167,7 +160,7 @@ pub struct BlockChain {

db: Arc<Database>,

cache_man: RwLock<CacheManager>,
cache_man: RwLock<CacheManager<CacheID>>,
}

impl BlockProvider for BlockChain {
Expand Down Expand Up @@ -297,8 +290,6 @@ impl BlockProvider for BlockChain {
}
}

const COLLECTION_QUEUE_SIZE: usize = 8;

pub struct AncestryIter<'a> {
current: H256,
chain: &'a BlockChain,
Expand All @@ -320,12 +311,10 @@ impl<'a> Iterator for AncestryIter<'a> {
impl BlockChain {
/// Create new instance of blockchain from given Genesis
pub fn new(config: Config, genesis: &[u8], db: Arc<Database>) -> BlockChain {
let mut cache_man = CacheManager{cache_usage: VecDeque::new(), in_use: HashSet::new()};
(0..COLLECTION_QUEUE_SIZE).foreach(|_| cache_man.cache_usage.push_back(HashSet::new()));
// 400 is the avarage size of the key
let cache_man = CacheManager::new(config.pref_cache_size, config.max_cache_size, 400);

let bc = BlockChain {
pref_cache_size: AtomicUsize::new(config.pref_cache_size),
max_cache_size: AtomicUsize::new(config.max_cache_size),
blooms_config: bc::Config {
levels: LOG_BLOOMS_LEVELS,
elements_per_index: LOG_BLOOMS_ELEMENTS_PER_INDEX,
Expand Down Expand Up @@ -449,12 +438,6 @@ impl BlockChain {
None
}

/// Set the cache configuration.
pub fn configure_cache(&self, pref_cache_size: usize, max_cache_size: usize) {
self.pref_cache_size.store(pref_cache_size, AtomicOrder::Relaxed);
self.max_cache_size.store(max_cache_size, AtomicOrder::Relaxed);
}

/// Returns a tree route between `from` and `to`, which is a tuple of:
///
/// - a vector of hashes of all blocks, ordered from `from` to `to`.
Expand Down Expand Up @@ -874,74 +857,40 @@ impl BlockChain {
/// Let the cache system know that a cacheable item has been used.
fn note_used(&self, id: CacheID) {
let mut cache_man = self.cache_man.write();
if !cache_man.cache_usage[0].contains(&id) {
cache_man.cache_usage[0].insert(id.clone());
if cache_man.in_use.contains(&id) {
if let Some(c) = cache_man.cache_usage.iter_mut().skip(1).find(|e|e.contains(&id)) {
c.remove(&id);
}
} else {
cache_man.in_use.insert(id);
}
}
cache_man.note_used(id);
}

/// Ticks our cache system and throws out any old data.
pub fn collect_garbage(&self) {
if self.cache_size().total() < self.pref_cache_size.load(AtomicOrder::Relaxed) {
// rotate cache
let mut cache_man = self.cache_man.write();
const AVERAGE_BYTES_PER_CACHE_ENTRY: usize = 400; //estimated
if cache_man.cache_usage[0].len() > self.pref_cache_size.load(AtomicOrder::Relaxed) / COLLECTION_QUEUE_SIZE / AVERAGE_BYTES_PER_CACHE_ENTRY {
trace!("Cache rotation, cache_size = {}", self.cache_size().total());
let cache = cache_man.cache_usage.pop_back().unwrap();
cache_man.cache_usage.push_front(cache);
}
return;
}

for i in 0..COLLECTION_QUEUE_SIZE {
{
trace!("Cache cleanup round started {}, cache_size = {}", i, self.cache_size().total());
let mut block_headers = self.block_headers.write();
let mut block_bodies = self.block_bodies.write();
let mut block_details = self.block_details.write();
let mut block_hashes = self.block_hashes.write();
let mut transaction_addresses = self.transaction_addresses.write();
let mut blocks_blooms = self.blocks_blooms.write();
let mut block_receipts = self.block_receipts.write();
let mut cache_man = self.cache_man.write();

for id in cache_man.cache_usage.pop_back().unwrap().into_iter() {
cache_man.in_use.remove(&id);
match id {
CacheID::BlockHeader(h) => { block_headers.remove(&h); },
CacheID::BlockBody(h) => { block_bodies.remove(&h); },
CacheID::BlockDetails(h) => { block_details.remove(&h); }
CacheID::BlockHashes(h) => { block_hashes.remove(&h); }
CacheID::TransactionAddresses(h) => { transaction_addresses.remove(&h); }
CacheID::BlocksBlooms(h) => { blocks_blooms.remove(&h); }
CacheID::BlockReceipts(h) => { block_receipts.remove(&h); }
}
let mut cache_man = self.cache_man.write();
cache_man.collect_carbage(|| self.cache_size().total(), | ids | {
let mut block_headers = self.block_headers.write();
let mut block_bodies = self.block_bodies.write();
let mut block_details = self.block_details.write();
let mut block_hashes = self.block_hashes.write();
let mut transaction_addresses = self.transaction_addresses.write();
let mut blocks_blooms = self.blocks_blooms.write();
let mut block_receipts = self.block_receipts.write();

for id in &ids {
match *id {
CacheID::BlockHeader(ref h) => { block_headers.remove(h); },
CacheID::BlockBody(ref h) => { block_bodies.remove(h); },
CacheID::BlockDetails(ref h) => { block_details.remove(h); }
CacheID::BlockHashes(ref h) => { block_hashes.remove(h); }
CacheID::TransactionAddresses(ref h) => { transaction_addresses.remove(h); }
CacheID::BlocksBlooms(ref h) => { blocks_blooms.remove(h); }
CacheID::BlockReceipts(ref h) => { block_receipts.remove(h); }
}
cache_man.cache_usage.push_front(HashSet::new());

// TODO: handle block_hashes properly.
block_hashes.clear();

block_headers.shrink_to_fit();
block_bodies.shrink_to_fit();
block_details.shrink_to_fit();
block_hashes.shrink_to_fit();
transaction_addresses.shrink_to_fit();
blocks_blooms.shrink_to_fit();
block_receipts.shrink_to_fit();
}
trace!("Cache cleanup round complete {}, cache_size = {}", i, self.cache_size().total());
if self.cache_size().total() < self.max_cache_size.load(AtomicOrder::Relaxed) { break; }
}

// TODO: m_lastCollection = chrono::system_clock::now();
block_headers.shrink_to_fit();
block_bodies.shrink_to_fit();
block_details.shrink_to_fit();
block_hashes.shrink_to_fit();
transaction_addresses.shrink_to_fit();
blocks_blooms.shrink_to_fit();
block_receipts.shrink_to_fit();
});
}

/// Create a block body from a block.
Expand Down
69 changes: 69 additions & 0 deletions ethcore/src/cache_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{VecDeque, HashSet};
use std::hash::Hash;

const COLLECTION_QUEUE_SIZE: usize = 8;

pub struct CacheManager<T> where T: Eq + Hash {
pref_cache_size: usize,
max_cache_size: usize,
bytes_per_cache_entry: usize,
cache_usage: VecDeque<HashSet<T>>
}

impl<T> CacheManager<T> where T: Eq + Hash {
pub fn new(pref_cache_size: usize, max_cache_size: usize, bytes_per_cache_entry: usize) -> Self {
CacheManager {
pref_cache_size: pref_cache_size,
max_cache_size: max_cache_size,
bytes_per_cache_entry: bytes_per_cache_entry,
cache_usage: (0..COLLECTION_QUEUE_SIZE).into_iter().map(|_| Default::default()).collect(),
}
}

pub fn note_used(&mut self, id: T) {
if !self.cache_usage[0].contains(&id) {
if let Some(c) = self.cache_usage.iter_mut().skip(1).find(|e| e.contains(&id)) {
c.remove(&id);
}
self.cache_usage[0].insert(id);
}
}

pub fn collect_carbage<C, F>(&mut self, current_size: C, mut notify_unused: F) where C: Fn() -> usize, F: FnMut(HashSet<T>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

carbage -> garbage

if current_size() < self.pref_cache_size {
self.rotate_cache_if_needed();
return;
}

for _ in 0..COLLECTION_QUEUE_SIZE {
notify_unused(self.cache_usage.pop_back().unwrap());
self.cache_usage.push_front(Default::default());
if current_size() < self.max_cache_size {
break;
}
}
}

fn rotate_cache_if_needed(&mut self) {
if self.cache_usage[0].len() * self.bytes_per_cache_entry > self.pref_cache_size / COLLECTION_QUEUE_SIZE {
let cache = self.cache_usage.pop_back().unwrap();
self.cache_usage.push_front(cache);
}
}
}
6 changes: 1 addition & 5 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ impl Client {
pub fn tick(&self) {
self.chain.collect_garbage();
self.block_queue.collect_garbage();
self.tracedb.collect_garbage();

match self.mode {
Mode::Dark(timeout) => {
Expand Down Expand Up @@ -581,11 +582,6 @@ impl Client {
}
}

/// Set up the cache behaviour.
pub fn configure_cache(&self, pref_cache_size: usize, max_cache_size: usize) {
self.chain.configure_cache(pref_cache_size, max_cache_size);
}

/// Look up the block number for the given block ID.
pub fn block_number(&self, id: BlockID) -> Option<BlockNumber> {
match id {
Expand Down
1 change: 1 addition & 0 deletions ethcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub mod snapshot;
pub mod action_params;
#[macro_use] pub mod evm;

mod cache_manager;
mod blooms;
mod db;
mod common;
Expand Down
9 changes: 6 additions & 3 deletions ethcore/src/trace/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ pub struct Config {
pub enabled: Switch,
/// Traces blooms configuration.
pub blooms: BloomConfig,
/// Database cache-size if not default
pub db_cache_size: Option<usize>,
/// Preferef cache-size.
pub pref_cache_size: usize,
/// Max cache-size.
pub max_cache_size: usize,
}

impl Default for Config {
Expand All @@ -80,7 +82,8 @@ impl Default for Config {
levels: 3,
elements_per_index: 16,
},
db_cache_size: None,
pref_cache_size: 15 * 1024 * 1024,
max_cache_size: 20 * 1024 * 1024,
}
}
}
Expand Down
Loading