From d4184f7392a9bda044804b3f42ab73906bba31fd Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Tue, 21 Jan 2020 06:41:58 +0000 Subject: [PATCH] implemented replay function for rocksdb --- ethdb/rocksdb/rocksdb-c-util.go | 30 +++++++++++++++++++++ ethdb/rocksdb/rocksdb.go | 47 ++++++++++++++++++++++++++------- 2 files changed, 68 insertions(+), 9 deletions(-) create mode 100644 ethdb/rocksdb/rocksdb-c-util.go diff --git a/ethdb/rocksdb/rocksdb-c-util.go b/ethdb/rocksdb/rocksdb-c-util.go new file mode 100644 index 000000000000..80bb683c57de --- /dev/null +++ b/ethdb/rocksdb/rocksdb-c-util.go @@ -0,0 +1,30 @@ +// rocksdb-c-util.go +// +build rocksdb + +package rocksdb + +/* + +#include +#include "rocksdb/c.h" + +extern void replayPut(char *, char *, size_t, char *, size_t); +extern void replayDel(char *, char *, size_t); + +static void replay_put(void *ptr, const char *k, size_t klen, const char *v, size_t vlen) +{ + replayPut((char *) ptr, (char *) k, klen, (char *) v, vlen); +} + +static void replay_del(void *ptr, const char *k, size_t klen) +{ + replayDel((char *) ptr, (char *) k, klen); +} + +void replay_iterate(int *wp, void *bp) +{ + rocksdb_writebatch_iterate(bp, wp, replay_put, replay_del); +} + +*/ +import "C" diff --git a/ethdb/rocksdb/rocksdb.go b/ethdb/rocksdb/rocksdb.go index 4743ba2876b7..2c697808d953 100644 --- a/ethdb/rocksdb/rocksdb.go +++ b/ethdb/rocksdb/rocksdb.go @@ -1,10 +1,16 @@ -// rocksdb_database.go +// rocksdb.go // +build rocksdb package rocksdb -// #include -// #include "rocksdb/c.h" +/* +#include +#include "rocksdb/c.h" + +typedef int * intp; + +extern void replay_iterate(int *wp, void *bp); +*/ import "C" import ( @@ -20,7 +26,7 @@ import ( // Metadium: db stats // (reads, read bytes, writes, written bytes, lookups, deletes) var ( - _stats_enabled = false + _stats_enabled = false _r_count, _r_bytes, _w_count, _w_bytes, _l_count, _d_count uint64 ) @@ -80,7 +86,7 @@ func (db *RDBDatabase) Path() string { func (db *RDBDatabase) Put(key []byte, value []byte) error { if _stats_enabled { atomic.AddUint64(&_w_count, 1) - atomic.AddUint64(&_w_bytes, uint64(len(key) + len(value))) + atomic.AddUint64(&_w_bytes, uint64(len(key)+len(value))) } var cerr *C.char ck, cv := b2c(key), b2c(value) @@ -131,7 +137,7 @@ func (db *RDBDatabase) Get(key []byte) ([]byte, error) { return nil, nil } if _stats_enabled { - atomic.AddUint64(&_r_bytes, uint64(len(key)) + uint64(C.int(cvl))) + atomic.AddUint64(&_r_bytes, uint64(len(key))+uint64(C.int(cvl))) } defer C.free(unsafe.Pointer(cv)) return C.GoBytes(unsafe.Pointer(cv), C.int(cvl)), nil @@ -206,7 +212,7 @@ type rdbBatch struct { func (b *rdbBatch) Put(key, value []byte) error { if _stats_enabled { atomic.AddUint64(&_w_count, 1) - atomic.AddUint64(&_w_bytes, uint64(len(key) + len(value))) + atomic.AddUint64(&_w_bytes, uint64(len(key)+len(value))) } ck, cv := b2c(key), b2c(value) C.rocksdb_writebatch_put(b.b, ck, C.size_t(len(key)), cv, C.size_t(len(value))) @@ -238,10 +244,33 @@ func (b *rdbBatch) Reset() { b.size = 0 } +//export replayPut +func replayPut(ptr *C.char, k *C.char, klen C.size_t, v *C.char, vlen C.size_t) { + w := (*ethdb.KeyValueWriter)(unsafe.Pointer(ptr)) + if w != nil { + (*w).Put(C.GoBytes(unsafe.Pointer(k), C.int(klen)), C.GoBytes(unsafe.Pointer(v), C.int(vlen))) + } +} + +//export replayDel +func replayDel(ptr *C.char, k *C.char, klen C.size_t) { + w := (*ethdb.KeyValueWriter)(unsafe.Pointer(ptr)) + if w != nil { + (*w).Delete(C.GoBytes(unsafe.Pointer(k), C.int(klen))) + } +} + // Replay replays the batch contents. func (b *rdbBatch) Replay(w ethdb.KeyValueWriter) error { - return errors.New("Not implemented") - //return b.b.Replay(&replayer{writer: w}) + repsize := C.size_t(0) + rep := C.rocksdb_writebatch_data(b.b, &repsize) + b2 := C.rocksdb_writebatch_create_from(rep, repsize) + bp := unsafe.Pointer(b2) + wp := (C.intp)(unsafe.Pointer(&w)) + + C.replay_iterate(wp, bp) + C.rocksdb_writebatch_destroy(b2) + return nil } // replayer is a small wrapper to implement the correct replay methods.