Skip to content

Commit

Permalink
temporary debugging stats
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeerbhola committed Feb 14, 2025
1 parent a8b60cf commit 0f515ce
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
3 changes: 3 additions & 0 deletions internal/base/compaction_grant_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type CompactionGrantHandleStats struct {
// TODO(sumeer): add more stats.
// cumReadBytes uint64
// cumReadBytesInCache uint64
Debugging struct {
EstimatedSize uint64
}
}

// CompactionGrantHandle is used to frequently update the CompactionScheduler
Expand Down
18 changes: 12 additions & 6 deletions internal/compact/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ func (r *Runner) writeKeysToTable(tw sstable.RawWriter) (splitKey []byte, _ erro
for ; key != nil; key, value = r.iter.Next() {
iteratedKeys++
if iteratedKeys%updateGrantHandleEveryNKeys == 0 {
r.cfg.GrantHandle.CumulativeStats(base.CompactionGrantHandleStats{
CumWriteBytes: r.stats.CumulativeWrittenSize + tw.EstimatedSize(),
})
estimatedSize := tw.EstimatedSize()
stats := base.CompactionGrantHandleStats{
CumWriteBytes: r.stats.CumulativeWrittenSize + estimatedSize,
}
stats.Debugging.EstimatedSize = estimatedSize
r.cfg.GrantHandle.CumulativeStats(stats)
// TODO: need to give the GrantHandle to the writer so it can account on
// all its goroutines.
r.cfg.GrantHandle.MeasureCPU(0)
Expand Down Expand Up @@ -245,9 +248,12 @@ func (r *Runner) writeKeysToTable(tw sstable.RawWriter) (splitKey []byte, _ erro
tw.SetSnapshotPinnedProperties(pinnedCount, pinnedKeySize, pinnedValueSize)
r.stats.CumulativePinnedKeys += pinnedCount
r.stats.CumulativePinnedSize += pinnedKeySize + pinnedValueSize
r.cfg.GrantHandle.CumulativeStats(base.CompactionGrantHandleStats{
CumWriteBytes: r.stats.CumulativeWrittenSize + tw.EstimatedSize(),
})
estimatedSize := tw.EstimatedSize()
stats := base.CompactionGrantHandleStats{
CumWriteBytes: r.stats.CumulativeWrittenSize + estimatedSize,
}
stats.Debugging.EstimatedSize = estimatedSize
r.cfg.GrantHandle.CumulativeStats(stats)
r.cfg.GrantHandle.MeasureCPU(0)
return splitKey, nil
}
Expand Down
34 changes: 34 additions & 0 deletions objstorage/objstorageprovider/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package objstorageprovider

import (
"context"
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
Expand Down Expand Up @@ -49,6 +50,7 @@ func (p *provider) vfsCreate(
NoSyncOnClose: p.st.NoSyncOnClose,
BytesPerSync: p.st.BytesPerSync,
})
file = newWriteShapingFile(file)
meta := objstorage.ObjectMetadata{
DiskFileNum: fileNum,
FileType: fileType,
Expand Down Expand Up @@ -117,3 +119,35 @@ func (p *provider) vfsSize(fileType base.FileType, fileNum base.DiskFileNum) (in
}
return stat.Size(), nil
}

const rateLimit = 10 << 20

type writeShapingFile struct {
vfs.File
createTime time.Time
writtenBytes int64
duration time.Duration
}

var _ vfs.File = (*writeShapingFile)(nil)

func newWriteShapingFile(file vfs.File) *writeShapingFile {
return &writeShapingFile{
File: file,
createTime: time.Now(),
}
}

func (f *writeShapingFile) Write(p []byte) (int, error) {
if f.writtenBytes > 0 {
expectedDurMillis := (time.Duration(f.writtenBytes) * 1e3)/ rateLimit
now := time.Now()
durMillis := now.Sub(f.createTime)/time.Millisecond
if durMillis < expectedDurMillis {
time.Sleep((expectedDurMillis - durMillis)*time.Millisecond)
}
}
f.writtenBytes += int64(len(p))
// This is a no-op, but it's useful for testing.
return f.File.Write(p)
}

0 comments on commit 0f515ce

Please sign in to comment.