Skip to content

Commit

Permalink
pkg/flock package
Browse files Browse the repository at this point in the history
Introduce the pkg/flock package, a flock(2)-based implementation in
golang that can be used within and across process spaces. The main
motivation for this package is to benefit from the TryLock-semantics
offered by flock(2), which is not possible with c/storage.Locker file
lock since they are based on fcntl(2).

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed Jul 1, 2019
1 parent 522d859 commit 06e4971
Show file tree
Hide file tree
Showing 84 changed files with 47,610 additions and 0 deletions.
63 changes: 63 additions & 0 deletions pkg/flock/flock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// flock exposes an API around flock(2) but allows for synchronizing within and
// across process spaces, making it suitable for currently running threads and
// processes alike. To support long-running daemons, the locks' file descriptors
// are closed when the lock is being released via Unlock() or RUnlock().
//
// Note that the flock package supports UNIX-only but compiles on Windows for
// compatibility reasons.

package flock

import (
"sync"

"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)

var (
errInternal = errors.New("internal error: flock")
errNotImplemented = errors.New("internal error: flock: not implemented")
)

// Flock implements a flock(2)-based file lock. It can be used to synchronize
// within the same process space and across process spaces.
type Flock interface {
// Lock locks the exclusive Flock for writing.
Lock() error
// TryLock locks the exclusive Flock without blocking. The first return
// value indicates if the Flock has been acquired.
TryLock() (bool, error)
// RLock locks the shared Flock for reading. Note that RLock is busy
// looping for acquiring the Flock.
RLock() error
// Unlock unlocks the Flock. If it's a shared Flock, unlock must be called
// by the number of RLock() calls for the underlying lock file to be
// unlocked.
Unlock() error
// Locked indicates if the Flock is locked.
Locked() bool
}

// flockType indicates the type of the Flock.
type flockType int

const (
// flockUnlocked indicates an unlock flock
flockUnlocked flockType = iota
// flockShared indicates a shared flock (i.e., reader)
flockShared
// flockExclusive indicates an exclusive flock (i.e., writer)
flockExclusive
)

// flock implements the Flock interface
type flock struct {
path string // path to the lock file
sem *semaphore.Weighted // process-space internal flock implementation

mutex *sync.Mutex // synchronization of state below
flockType flockType // current type of the lock
fd int // file descriptor
refs uint // reference counting
}
176 changes: 176 additions & 0 deletions pkg/flock/flock_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// +build linux solaris darwin freebsd

package flock

import (
"context"
"fmt"
"os"
"sync"

"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
"golang.org/x/sys/unix"
)

// New returns a Flock for path.
func New(path string) (flock, error) {
// Make sure we can operate on the path.
fd, err := unix.Open(path, os.O_RDWR|os.O_CREATE, unix.S_IRUSR|unix.S_IWUSR)
if err != nil {
return flock{}, err
}
if err := unix.Close(fd); err != nil {
return flock{}, err
}
return flock{
flockType: flockUnlocked,
fd: -1,
path: path,
mutex: new(sync.Mutex),
sem: semaphore.NewWeighted(1),
}, nil
}

// open opens the flock's path and sets the file descriptor. State must be
// synchronized by the caller.
func (f *flock) open() error {
if f.fd != -1 || f.flockType != flockUnlocked {
return fmt.Errorf("%v: trying to open improper Flock %q %d %d", errInternal, f.path, f.fd, f.flockType)
}
fd, err := unix.Open(f.path, os.O_RDWR|os.O_CREATE, unix.S_IRUSR|unix.S_IWUSR)
if err != nil {
return err
}
f.fd = fd
return nil
}

func (f *flock) Locked() bool {
f.mutex.Lock()
defer f.mutex.Unlock()
return f.refs > 0
}

func (f *flock) Lock() (err error) {
if err := f.sem.Acquire(context.Background(), 1); err != nil {
return err
}

defer func() {
if err != nil {
return
}
f.mutex.Lock()
f.refs++
f.flockType = flockExclusive
f.mutex.Unlock()
}()

f.mutex.Lock()
if err := f.open(); err != nil {
f.mutex.Unlock()
return err
}
f.mutex.Unlock()

return unix.Flock(f.fd, unix.LOCK_EX)
}

func (f *flock) TryLock() (acquired bool, err error) {
if acquired := f.sem.TryAcquire(1); !acquired {
return false, nil
}

f.mutex.Lock()
defer f.mutex.Unlock()

defer func() { // Deferred clean-ups.
if err == nil {
// No error, so we acquired the lock.
f.refs++
acquired = true
f.flockType = flockExclusive
return
}
if errors.Cause(err) == unix.EWOULDBLOCK {
// If it's a blocking error, nil the error.
err = nil
}
// Now, we still need to clean-up and close the file descriptor.
if closeErr := unix.Close(f.fd); closeErr != nil {
// Note: errors.Wrap*(nil, ...) is always nil
if err == nil {
err = closeErr
} else {
errors.Wrap(err, closeErr.Error())
}
}
f.fd = -1
f.sem.Release(1)
}()

if err := f.open(); err != nil {
return false, err
}
return acquired, unix.Flock(f.fd, unix.LOCK_EX|unix.LOCK_NB)
}

func (f *flock) RLock() error {
for { // Busy loop to avoid dead locks.
f.mutex.Lock()
// Unlock requires owning the mutex, so we will increment the ref conter
// **before** any concurrent threads may unlock underlying file.
if f.flockType == flockShared {
break
}
// If we don't already own the flock (see upper case), we fight with
// other threads for the semaphore.
if f.sem.TryAcquire(1) {
break
}
f.mutex.Unlock()
}

f.refs++
if f.refs > 1 {
f.mutex.Unlock()
return nil
}

if err := f.open(); err != nil {
f.mutex.Unlock()
return err
}

f.flockType = flockShared
f.mutex.Unlock()
return unix.Flock(f.fd, unix.LOCK_SH)
}

func (f *flock) Unlock() error {
// Note: it's crucial we own the mutex for the entire unlock() procedure.
// RLock() depends on it.
f.mutex.Lock()
defer f.mutex.Unlock()

if f.refs == 0 {
return fmt.Errorf("%v: unlocking unlocked Flock %q", errInternal, f.path)
}

f.refs--
if f.refs == 0 {
defer func() {
f.flockType = flockUnlocked
f.fd = -1
f.sem.Release(1)
}()
if err := unix.Flock(f.fd, unix.LOCK_UN); err != nil {
return err
}
if err := unix.Close(f.fd); err != nil {
return err
}
}
return nil
}
Loading

0 comments on commit 06e4971

Please sign in to comment.