-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtxn.go
82 lines (72 loc) · 1.81 KB
/
txn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package db
// Txn is Transaction struct for Optimistic Concurrency Control.
type Txn struct {
db *DB
startTs uint64
commitTs uint64
writeCache map[string]*Entry
readSet map[string]uint64
}
// StartTxn returns a new Txn to perform ops on
func (db *DB) StartTxn() *Txn {
return &Txn{
db: db,
startTs: db.oracle.requestStart(),
writeCache: make(map[string]*Entry),
readSet: make(map[string]uint64),
}
}
// Read gets value for a key from the DB and updates the txn readSet
func (txn *Txn) Read(key string) (*Entry, error) {
entry, err := txn.db.read(key, txn.startTs)
if err != nil {
return nil, err
}
txn.readSet[key] = entry.ts
return entry, nil
}
// Write updates the write cache of the txn
func (txn *Txn) Write(key string, attributes map[string]*Value) {
txn.writeCache[key] = &Entry{
Key: key,
Attributes: attributes,
}
}
// Delete updates the write cache of the txn
func (txn *Txn) Delete(key string) {
txn.writeCache[key] = &Entry{
Key: key,
Attributes: nil,
}
}
// Scan gets a range of values from a start key to an end key from the DB and updates the txn readSet
func (txn *Txn) Scan(startKey, endKey string) ([]*Entry, error) {
kvs, err := txn.db.scan(startKey, endKey, txn.startTs)
if err != nil {
return nil, err
}
for _, kv := range kvs {
txn.readSet[kv.Key] = kv.ts
}
return kvs, nil
}
// Exists checks if key exists in the db
func (txn *Txn) Exists(key string) (bool, error) {
_, err := txn.Read(key)
if err != nil {
switch err.(type) {
case *ErrKeyNotFound:
return false, nil
default:
return false, err
}
}
return true, nil
}
// Commit sends the txn's read and write set to the oracle for commit
func (txn *Txn) Commit() error {
if len(txn.writeCache) == 0 {
return nil
}
return txn.db.oracle.commit(txn.readSet, txn.writeCache)
}