Skip to content

Commit

Permalink
vfs: clean up Fd functionality
Browse files Browse the repository at this point in the history
We currently sidestep the `vfs.File` interface to obtain the raw file
descriptor when the `File` is backed by an underlying `*os.File`.

This mechanism (where we cast to `interface{ Fd() }`) is fragile and
requires extra "fd wrappers" to plumb the `Fd` method through disk full and disk health wrappers.
Each fd wrapper introduces an extra virtual table indirection on every
File interface call.

This change cleans this up by making all `File`s implement `Fd()`; we
allow returning 0 when there is no raw file descriptor. It is up to
the (very few) callers to check if the result is 0.
  • Loading branch information
RaduBerinde committed Jan 23, 2023
1 parent 7953ed0 commit 3fc1932
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 85 deletions.
4 changes: 4 additions & 0 deletions internal/errorfs/errorfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,7 @@ func (f *errorFile) Sync() error {
}
return f.file.Sync()
}

func (f *errorFile) Fd() uintptr {
return f.file.Fd()
}
4 changes: 3 additions & 1 deletion sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3021,7 +3021,9 @@ func (r *Reader) readBlock(
Fd() uintptr
}
if f, ok := r.file.(fd); ok {
_ = vfs.Prefetch(f.Fd(), bh.Offset, uint64(readaheadSize))
if fd := f.Fd(); fd != 0 {
_ = vfs.Prefetch(fd, bh.Offset, uint64(readaheadSize))
}
}
}
}
Expand Down
35 changes: 20 additions & 15 deletions vfs/disk_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ func (fs *enospcFS) Create(name string) (File, error) {
f, err = fs.inner.Create(name)
}
if f != nil {
f = WithFd(f, enospcFile{
f = &enospcFile{
fs: fs,
inner: f,
})
}
}
return f, err
}
Expand All @@ -222,21 +222,21 @@ func (fs *enospcFS) Link(oldname, newname string) error {
func (fs *enospcFS) Open(name string, opts ...OpenOption) (File, error) {
f, err := fs.inner.Open(name, opts...)
if f != nil {
f = WithFd(f, enospcFile{
f = &enospcFile{
fs: fs,
inner: f,
})
}
}
return f, err
}

func (fs *enospcFS) OpenDir(name string) (File, error) {
f, err := fs.inner.OpenDir(name)
if f != nil {
f = WithFd(f, enospcFile{
f = &enospcFile{
fs: fs,
inner: f,
})
}
}
return f, err
}
Expand Down Expand Up @@ -288,10 +288,10 @@ func (fs *enospcFS) ReuseForWrite(oldname, newname string) (File, error) {
}

if f != nil {
f = WithFd(f, enospcFile{
f = &enospcFile{
fs: fs,
inner: f,
})
}
}
return f, err
}
Expand Down Expand Up @@ -349,19 +349,21 @@ type enospcFile struct {
inner File
}

func (f enospcFile) Close() error {
var _ File = (*enospcFile)(nil)

func (f *enospcFile) Close() error {
return f.inner.Close()
}

func (f enospcFile) Read(p []byte) (n int, err error) {
func (f *enospcFile) Read(p []byte) (n int, err error) {
return f.inner.Read(p)
}

func (f enospcFile) ReadAt(p []byte, off int64) (n int, err error) {
func (f *enospcFile) ReadAt(p []byte, off int64) (n int, err error) {
return f.inner.ReadAt(p, off)
}

func (f enospcFile) Write(p []byte) (n int, err error) {
func (f *enospcFile) Write(p []byte) (n int, err error) {
gen := f.fs.waitUntilReady()

n, err = f.inner.Write(p)
Expand All @@ -375,11 +377,11 @@ func (f enospcFile) Write(p []byte) (n int, err error) {
return n, err
}

func (f enospcFile) Stat() (os.FileInfo, error) {
func (f *enospcFile) Stat() (os.FileInfo, error) {
return f.inner.Stat()
}

func (f enospcFile) Sync() error {
func (f *enospcFile) Sync() error {
gen := f.fs.waitUntilReady()

err := f.inner.Sync()
Expand All @@ -399,7 +401,10 @@ func (f enospcFile) Sync() error {
return err
}

// Ensure that *enospcFS implements the FS interface.
func (f *enospcFile) Fd() uintptr {
return f.inner.Fd()
}

var _ FS = (*enospcFS)(nil)

func isENOSPC(err error) bool {
Expand Down
6 changes: 3 additions & 3 deletions vfs/disk_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (d *diskHealthCheckingFS) Create(name string) (File, error) {
d.onSlowDisk(name, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
return checkingFile, nil
}

// GetDiskUsage implements the FS interface.
Expand Down Expand Up @@ -420,7 +420,7 @@ func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
d.onSlowDisk(name, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
return checkingFile, nil
}

// PathBase implements the FS interface.
Expand Down Expand Up @@ -482,7 +482,7 @@ func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, err
d.onSlowDisk(newname, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
return checkingFile, nil
}

// Stat implements the FS interface.
Expand Down
4 changes: 4 additions & 0 deletions vfs/disk_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (m mockFile) Stat() (os.FileInfo, error) {
panic("unimplemented")
}

func (m mockFile) Fd() uintptr {
return 0
}

func (m mockFile) Sync() error {
time.Sleep(m.syncDuration)
return nil
Expand Down
42 changes: 0 additions & 42 deletions vfs/fd.go

This file was deleted.

8 changes: 2 additions & 6 deletions vfs/fd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ func TestFileWrappersHaveFd(t *testing.T) {
defer closer.Close()
f2, err := fs2.Open(filename)
require.NoError(t, err)
if _, ok := f2.(fdGetter); !ok {
t.Fatal("expected diskHealthCheckingFile to export Fd() method")
}
require.NotZero(t, f2.Fd())
// File wrapper case 2: Check if syncingFile has Fd().
f3 := NewSyncingFile(f2, SyncingFileOptions{BytesPerSync: 8 << 10 /* 8 KB */})
if _, ok := f3.(fdGetter); !ok {
t.Fatal("expected syncingFile to export Fd() method")
}
require.NotZero(t, f3.Fd())
require.NoError(t, f2.Close())
}
6 changes: 6 additions & 0 deletions vfs/mem_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,8 @@ type memFile struct {
read, write bool
}

var _ File = (*memFile)(nil)

func (f *memFile) Close() error {
if n := atomic.AddInt32(&f.n.refs, -1); n < 0 {
panic(fmt.Sprintf("pebble: close of unopened file: %d", n))
Expand Down Expand Up @@ -721,6 +723,10 @@ func (f *memFile) Sync() error {
return nil
}

func (f *memFile) Fd() uintptr {
return 0
}

// Flush is a no-op and present only to prevent buffering at higher levels
// (e.g. it prevents sstable.Writer from using a bufio.Writer).
func (f *memFile) Flush() error {
Expand Down
10 changes: 3 additions & 7 deletions vfs/syncing_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type SyncingFileOptions struct {

type syncingFile struct {
File
// fd can be 0 if the underlying File does not support it.
fd uintptr
useSyncRange bool
closing bool
Expand Down Expand Up @@ -49,6 +50,7 @@ type syncingFile struct {
func NewSyncingFile(f File, opts SyncingFileOptions) File {
s := &syncingFile{
File: f,
fd: f.Fd(),
noSyncOnClose: bool(opts.NoSyncOnClose),
bytesPerSync: int64(opts.BytesPerSync),
preallocateSize: int64(opts.PreallocateSize),
Expand All @@ -57,12 +59,6 @@ func NewSyncingFile(f File, opts SyncingFileOptions) File {
// data has been written to it.
s.atomic.syncOffset = -1

type fd interface {
Fd() uintptr
}
if d, ok := f.(fd); ok {
s.fd = d.Fd()
}
type dhChecker interface {
timeDiskOp(op func())
}
Expand All @@ -79,7 +75,7 @@ func NewSyncingFile(f File, opts SyncingFileOptions) File {
if s.syncData == nil {
s.syncData = s.File.Sync
}
return WithFd(f, s)
return s
}

// NB: syncingFile.Write is unsafe for concurrent use!
Expand Down
1 change: 0 additions & 1 deletion vfs/syncing_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func TestSyncingFile(t *testing.T) {
t.Fatalf("failed to wrap: %p != %p", f, s)
}
s = NewSyncingFile(f, SyncingFileOptions{BytesPerSync: 8 << 10 /* 8 KB */})
s = s.(*fdFileWrapper).File
s.(*syncingFile).fd = 1
s.(*syncingFile).syncTo = func(offset int64) error {
s.(*syncingFile).ratchetSyncOffset(offset)
Expand Down
19 changes: 9 additions & 10 deletions vfs/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type File interface {
io.Writer
Stat() (os.FileInfo, error)
Sync() error

// Fd returns the raw file descriptor when a File is backed by an *os.File.
// It can be used for specific functionality like Prefetch.
// Returns 0 if not supported.
Fd() uintptr
}

// OpenOption provide an interface to do work on file handles in the Open()
Expand Down Expand Up @@ -240,11 +245,8 @@ var RandomReadsOption OpenOption = &randomReadsOption{}

// Apply implements the OpenOption interface.
func (randomReadsOption) Apply(f File) {
type fd interface {
Fd() uintptr
}
if fdFile, ok := f.(fd); ok {
_ = fadviseRandom(fdFile.Fd())
if fd := f.Fd(); fd != 0 {
_ = fadviseRandom(fd)
}
}

Expand All @@ -257,11 +259,8 @@ var SequentialReadsOption OpenOption = &sequentialReadsOption{}

// Apply implements the OpenOption interface.
func (sequentialReadsOption) Apply(f File) {
type fd interface {
Fd() uintptr
}
if fdFile, ok := f.(fd); ok {
_ = fadviseSequential(fdFile.Fd())
if fd := f.Fd(); fd != 0 {
_ = fadviseSequential(fd)
}
}

Expand Down

0 comments on commit 3fc1932

Please sign in to comment.