Skip to content

Commit

Permalink
implemented replay function for rocksdb
Browse files Browse the repository at this point in the history
  • Loading branch information
sadoci committed Jan 21, 2020
1 parent 51e8a72 commit d4184f7
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 9 deletions.
30 changes: 30 additions & 0 deletions ethdb/rocksdb/rocksdb-c-util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// rocksdb-c-util.go
// +build rocksdb

package rocksdb

/*
#include <stdlib.h>
#include "rocksdb/c.h"
extern void replayPut(char *, char *, size_t, char *, size_t);
extern void replayDel(char *, char *, size_t);
static void replay_put(void *ptr, const char *k, size_t klen, const char *v, size_t vlen)
{
replayPut((char *) ptr, (char *) k, klen, (char *) v, vlen);
}
static void replay_del(void *ptr, const char *k, size_t klen)
{
replayDel((char *) ptr, (char *) k, klen);
}
void replay_iterate(int *wp, void *bp)
{
rocksdb_writebatch_iterate(bp, wp, replay_put, replay_del);
}
*/
import "C"
47 changes: 38 additions & 9 deletions ethdb/rocksdb/rocksdb.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
// rocksdb_database.go
// rocksdb.go
// +build rocksdb

package rocksdb

// #include <stdlib.h>
// #include "rocksdb/c.h"
/*
#include <stdlib.h>
#include "rocksdb/c.h"
typedef int * intp;
extern void replay_iterate(int *wp, void *bp);
*/
import "C"

import (
Expand All @@ -20,7 +26,7 @@ import (
// Metadium: db stats
// (reads, read bytes, writes, written bytes, lookups, deletes)
var (
_stats_enabled = false
_stats_enabled = false
_r_count, _r_bytes, _w_count, _w_bytes, _l_count, _d_count uint64
)

Expand Down Expand Up @@ -80,7 +86,7 @@ func (db *RDBDatabase) Path() string {
func (db *RDBDatabase) Put(key []byte, value []byte) error {
if _stats_enabled {
atomic.AddUint64(&_w_count, 1)
atomic.AddUint64(&_w_bytes, uint64(len(key) + len(value)))
atomic.AddUint64(&_w_bytes, uint64(len(key)+len(value)))
}
var cerr *C.char
ck, cv := b2c(key), b2c(value)
Expand Down Expand Up @@ -131,7 +137,7 @@ func (db *RDBDatabase) Get(key []byte) ([]byte, error) {
return nil, nil
}
if _stats_enabled {
atomic.AddUint64(&_r_bytes, uint64(len(key)) + uint64(C.int(cvl)))
atomic.AddUint64(&_r_bytes, uint64(len(key))+uint64(C.int(cvl)))
}
defer C.free(unsafe.Pointer(cv))
return C.GoBytes(unsafe.Pointer(cv), C.int(cvl)), nil
Expand Down Expand Up @@ -206,7 +212,7 @@ type rdbBatch struct {
func (b *rdbBatch) Put(key, value []byte) error {
if _stats_enabled {
atomic.AddUint64(&_w_count, 1)
atomic.AddUint64(&_w_bytes, uint64(len(key) + len(value)))
atomic.AddUint64(&_w_bytes, uint64(len(key)+len(value)))
}
ck, cv := b2c(key), b2c(value)
C.rocksdb_writebatch_put(b.b, ck, C.size_t(len(key)), cv, C.size_t(len(value)))
Expand Down Expand Up @@ -238,10 +244,33 @@ func (b *rdbBatch) Reset() {
b.size = 0
}

//export replayPut
func replayPut(ptr *C.char, k *C.char, klen C.size_t, v *C.char, vlen C.size_t) {
w := (*ethdb.KeyValueWriter)(unsafe.Pointer(ptr))
if w != nil {
(*w).Put(C.GoBytes(unsafe.Pointer(k), C.int(klen)), C.GoBytes(unsafe.Pointer(v), C.int(vlen)))
}
}

//export replayDel
func replayDel(ptr *C.char, k *C.char, klen C.size_t) {
w := (*ethdb.KeyValueWriter)(unsafe.Pointer(ptr))
if w != nil {
(*w).Delete(C.GoBytes(unsafe.Pointer(k), C.int(klen)))
}
}

// Replay replays the batch contents.
func (b *rdbBatch) Replay(w ethdb.KeyValueWriter) error {
return errors.New("Not implemented")
//return b.b.Replay(&replayer{writer: w})
repsize := C.size_t(0)
rep := C.rocksdb_writebatch_data(b.b, &repsize)
b2 := C.rocksdb_writebatch_create_from(rep, repsize)
bp := unsafe.Pointer(b2)
wp := (C.intp)(unsafe.Pointer(&w))

C.replay_iterate(wp, bp)
C.rocksdb_writebatch_destroy(b2)
return nil
}

// replayer is a small wrapper to implement the correct replay methods.
Expand Down

0 comments on commit d4184f7

Please sign in to comment.