Skip to content

Commit

Permalink
fix: batched put, delete and commit operations are now measured separ…
Browse files Browse the repository at this point in the history
…ately from regular operations
  • Loading branch information
aschmahmann committed Nov 26, 2019
1 parent c684950 commit 806c559
Showing 1 changed file with 53 additions and 46 deletions.
99 changes: 53 additions & 46 deletions measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ func New(prefix string, ds datastore.Datastore) *measure {
duErr: metrics.New(prefix+".du.errors_total", "Number of errored Datastore.DiskUsage calls").Counter(),
duLatency: metrics.New(prefix+".du.latency_seconds",
"Latency distribution of Datastore.DiskUsage calls").Histogram(datastoreLatencyBuckets),

batchPutNum: metrics.New(prefix+".batchput_total", "Total number of Batch.Put calls").Counter(),
batchPutErr: metrics.New(prefix+".batchput.errors_total", "Number of errored Batch.Put calls").Counter(),
batchPutLatency: metrics.New(prefix+".batchput.latency_seconds",
"Latency distribution of Batch.Put calls").Histogram(datastoreLatencyBuckets),
batchPutSize: metrics.New(prefix+".batchput.size_bytes",
"Size distribution of byte slices put into batches").Histogram(datastoreSizeBuckets),

batchDeleteNum: metrics.New(prefix+".batchdelete_total", "Total number of Batch.Delete calls").Counter(),
batchDeleteErr: metrics.New(prefix+".batchdelete.errors_total", "Number of errored Batch.Delete calls").Counter(),
batchDeleteLatency: metrics.New(prefix+".batchdelete.latency_seconds",
"Latency distribution of Batch.Delete calls").Histogram(datastoreLatencyBuckets),

batchCommitNum: metrics.New(prefix+".batchcommit_total", "Total number of Batch.Commit calls").Counter(),
batchCommitErr: metrics.New(prefix+".batchcommit.errors_total", "Number of errored Batch.Commit calls").Counter(),
batchCommitLatency: metrics.New(prefix+".batchcommit.latency_seconds",
"Latency distribution of Batch.Commit calls").Histogram(datastoreLatencyBuckets),
}
return m
}
Expand Down Expand Up @@ -125,6 +142,19 @@ type measure struct {
duNum metrics.Counter
duErr metrics.Counter
duLatency metrics.Histogram

batchPutNum metrics.Counter
batchPutErr metrics.Counter
batchPutLatency metrics.Histogram
batchPutSize metrics.Histogram

batchDeleteNum metrics.Counter
batchDeleteErr metrics.Counter
batchDeleteLatency metrics.Histogram

batchCommitNum metrics.Counter
batchCommitErr metrics.Counter
batchCommitLatency metrics.Histogram
}

func recordLatency(h metrics.Histogram, start time.Time) {
Expand Down Expand Up @@ -251,12 +281,7 @@ func (m *measure) DiskUsage() (uint64, error) {
}

type measuredBatch struct {
puts int
deletes int

putts datastore.Batch
delts datastore.Batch

b datastore.Batch
m *measure
}

Expand All @@ -265,64 +290,46 @@ func (m *measure) Batch() (datastore.Batch, error) {
if !ok {
return nil, datastore.ErrBatchUnsupported
}
pb, err := bds.Batch()
if err != nil {
return nil, err
}

db, err := bds.Batch()
batch, err := bds.Batch()
if err != nil {
return nil, err
}

return &measuredBatch{
putts: pb,
delts: db,

b: batch,
m: m,
}, nil
}

func (mt *measuredBatch) Put(key datastore.Key, val []byte) error {
mt.puts++
mt.m.putSize.Observe(float64(len(val)))
return mt.putts.Put(key, val)
}

func (mt *measuredBatch) Delete(key datastore.Key) error {
mt.deletes++
return mt.delts.Delete(key)
}

func (mt *measuredBatch) Commit() error {
err := logBatchCommit(mt.delts, mt.deletes, mt.m.deleteNum, mt.m.deleteErr, mt.m.deleteLatency)
defer recordLatency(mt.m.batchPutLatency, time.Now())
mt.m.batchPutNum.Inc()
mt.m.batchPutSize.Observe(float64(len(val)))
err := mt.b.Put(key, val)
if err != nil {
return err
mt.m.batchPutErr.Inc()
}
return err
}

err = logBatchCommit(mt.putts, mt.puts, mt.m.putNum, mt.m.putErr, mt.m.putLatency)
func (mt *measuredBatch) Delete(key datastore.Key) error {
defer recordLatency(mt.m.batchDeleteLatency, time.Now())
mt.m.batchDeleteNum.Inc()
err := mt.b.Delete(key)
if err != nil {
return err
mt.m.batchDeleteErr.Inc()
}

return nil
return err
}

func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat metrics.Histogram) error {
if n > 0 {
before := time.Now()
err := b.Commit()
took := time.Since(before) / time.Duration(n)
num.Add(float64(n))
for i := 0; i < n; i++ {
lat.Observe(took.Seconds())
}
if err != nil {
errs.Inc()
return err
}
func (mt *measuredBatch) Commit() error {
defer recordLatency(mt.m.batchCommitLatency, time.Now())
mt.m.batchCommitNum.Inc()
err := mt.b.Commit()
if err != nil {
mt.m.batchCommitErr.Inc()
}
return nil
return err
}

func (m *measure) Close() error {
Expand Down

0 comments on commit 806c559

Please sign in to comment.