Skip to content

Commit

Permalink
feat(replica): run fsync on files & dir after create/remove/rename op…
Browse files Browse the repository at this point in the history
…eration on files (#278)

We open file in O_DIRECT mode which minimizes the cache effects but doesn't
guarantee that data is written to the disk. Periodic sync ensures that both
data and metadata is flushed to the disk at every 5 seconds. See man page of
open system call for more details. Also since fsync on fd's also doesn't
guarantee the same so fsync on directory is also required.

In this commit a goroutine is launched to perform sync on data files and
directory. Also sync and close the replicas gracefully before fataling upon
error or killing the replicas.

Signed-off-by: Utkarsh Mani Tripathi <[email protected]>
  • Loading branch information
Utkarsh Mani Tripathi authored Apr 14, 2020
1 parent 6895a58 commit 2b621d1
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 21 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ help:
@echo "Usage:-"
@echo "\tmake build -- will create jiva image"
@echo "\tmake deps -- will verify build dependencies are installed"
@echo "\tARCH = $(ARCH) -- arch where make is running"
@echo ""


Expand Down Expand Up @@ -61,10 +62,9 @@ mod: go.mod go.sum
deps: _build_check_go _build_check_docker mod
@echo "INFO:\tVerifying dependencies for jiva"


_run_ci:
@echo "INFO:\tRun ci over jiva image"
sudo bash ./ci/start_init_test.sh
sudo -E bash ./ci/start_init_test.sh

test:
@echo "INFO:\tRun ci over jiva image"
Expand Down
3 changes: 2 additions & 1 deletion app/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/openebs/jiva/alertlog"
"github.com/openebs/jiva/sync"

"github.com/openebs/jiva/types"

Expand All @@ -21,7 +22,6 @@ import (
"github.com/openebs/jiva/replica"
"github.com/openebs/jiva/replica/rest"
"github.com/openebs/jiva/replica/rpc"
"github.com/openebs/jiva/sync"
"github.com/openebs/jiva/util"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
Expand Down Expand Up @@ -309,6 +309,7 @@ func startReplica(c *cli.Context) error {
return err
}
}

select {
case resp = <-controlResp:
alertlog.Logger.Errorw("",
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
2 changes: 1 addition & 1 deletion replica/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func drainHoleCreatorChan() {
}

//CreateHoles removes the offsets from corresponding sparse files
func CreateHoles() error {
func CreateHoles() {
var (
fd uintptr
)
Expand Down
76 changes: 62 additions & 14 deletions replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (r *Replica) hardlinkDisk(target, source string) error {
if err := os.Link(r.diskPath(source), r.diskPath(target)); err != nil {
return fmt.Errorf("Fail to link %s to %s", source, target)
}
return nil
return r.syncDir()
}

// ReplaceDisk replace the source with target snapshot
Expand Down Expand Up @@ -716,6 +716,17 @@ func (r *Replica) isBackingFile(index int) bool {
return index == 1
}

func (r *Replica) closeAndSyncDir(f types.DiffDisk) error {
err := f.Close()
if err != nil {
return err
}
if err = r.syncDir(); err != nil {
return err
}
return nil
}

func (r *Replica) close() error {
for i, f := range r.volume.files {
if f != nil && !r.isBackingFile(i) {
Expand All @@ -733,22 +744,27 @@ func (r *Replica) encodeToFile(obj interface{}, file string) error {

f, err := os.Create(r.diskPath(file + ".tmp"))
if err != nil {
logrus.Errorf("failed to create temp file: %s while encoding the data to file", file)
logrus.Errorf("Failed to create temp file: %s while encoding the data to file", file)
return err
}
defer f.Close()

if err := json.NewEncoder(f).Encode(&obj); err != nil {
logrus.Errorf("failed to encode the data to file: %s", f.Name())
return err
if lastErr := json.NewEncoder(f).Encode(&obj); err != nil {
if err := f.Close(); err != nil {
logrus.Errorf("Failed to close file: %v, err: %v", f.Name(), err)
}
logrus.Errorf("Failed to encode the data to file: %s", f.Name())
return lastErr
}

if err := f.Close(); err != nil {
logrus.Errorf("failed to close file after encoding to file: %s", f.Name())
logrus.Errorf("Failed to close file: %v after encoding", f.Name())
return err
}

return os.Rename(r.diskPath(file+".tmp"), r.diskPath(file))
if err := os.Rename(r.diskPath(file+".tmp"), r.diskPath(file)); err != nil {
return err
}
return r.syncDir()
}

func (r *Replica) nextFile(parsePattern *regexp.Regexp, pattern, parent string) (string, error) {
Expand All @@ -769,6 +785,22 @@ func (r *Replica) openFile(name string, flag int) (types.DiffDisk, error) {
return sparse.NewDirectFileIoProcessor(r.diskPath(name), os.O_RDWR|flag, 06666, true)
}

// after creating or deleting the file the directory also needs to be synced
// in order to guarantee the file is visible across system crashes. See man
// page of fsync for more details.
func (r *Replica) syncDir() error {
f, err := os.Open(r.dir)
if err != nil {
return err
}
err = f.Sync()
closeErr := f.Close()
if err != nil {
return err
}
return closeErr
}

func (r *Replica) createNewHead(oldHead, parent, created string) (types.DiffDisk, disk, error) {
newHeadName, err := r.nextFile(diskPattern, headName, oldHead)
if err != nil {
Expand All @@ -783,6 +815,8 @@ func (r *Replica) createNewHead(oldHead, parent, created string) (types.DiffDisk
if err != nil {
return nil, disk{}, err
}

// file created before this needs to be deleted in case of error
if err := syscall.Truncate(r.diskPath(newHeadName), r.info.Size); err != nil {
return nil, disk{}, err
}
Expand Down Expand Up @@ -816,7 +850,10 @@ func (r *Replica) linkDisk(oldname, newname string) error {
return err
}

return os.Link(r.diskPath(oldname+metadataSuffix), r.diskPath(newname+metadataSuffix))
if err := os.Link(r.diskPath(oldname+metadataSuffix), r.diskPath(newname+metadataSuffix)); err != nil {
return err
}
return r.syncDir()
}

func (r *Replica) markDiskAsRemoved(name string) error {
Expand Down Expand Up @@ -844,6 +881,11 @@ func (r *Replica) rmDisk(name string) error {
if err := os.Remove(r.diskPath(name + metadataSuffix)); err != nil {
lastErr = err
}

if lastErr == nil {
lastErr = r.syncDir()
}

return lastErr
}

Expand Down Expand Up @@ -909,7 +951,7 @@ func (r *Replica) createDisk(name string, userCreated bool, created string) erro
if !done {
r.rmDisk(newHeadDisk.Name)
r.rmDisk(newSnapName)
f.Close()
f.Close() // rm only unlink the file since fd is still open
return
}
r.rmDisk(oldHead)
Expand Down Expand Up @@ -1115,10 +1157,16 @@ func (r *Replica) unmarshalFile(file string, obj interface{}) error {
if err != nil {
return err
}
defer f.Close()

dec := json.NewDecoder(f)
return dec.Decode(obj)
err = json.NewDecoder(f).Decode(obj)
if err != nil {
if closeErr := f.Close(); closeErr != nil {
logrus.Errorf("Fail to close file: %v, err: %v", f.Name(), closeErr)
}
return err
}

return f.Close()
}

func (r *Replica) Close() error {
Expand Down Expand Up @@ -1152,7 +1200,7 @@ func (r *Replica) Delete() error {
logrus.Error("Error in removing revision counter file, error : ", err.Error())
return err
}
return nil
return r.syncDir()
}

func (r *Replica) DeleteAll() error {
Expand Down
36 changes: 34 additions & 2 deletions replica/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Server struct {
//between controller and replica. If the connection is broken,
//the replica attempts to connect back to controller
MonitorChannel chan struct{}
//closeSync chan struct{}
}

func NewServer(dir string, sectorSize int64, serverType string) *Server {
Expand All @@ -49,6 +50,7 @@ func NewServer(dir string, sectorSize int64, serverType string) *Server {
defaultSectorSize: sectorSize,
ServerType: serverType,
MonitorChannel: make(chan struct{}),
// closeSync: make(chan struct{}),
}
}

Expand Down Expand Up @@ -176,6 +178,34 @@ func (s *Server) Open() error {
return nil
}

/*
TODO: Enabling periodic sync will slow down replica a bit
need to verify how much penalty we have to pay by Enabling it.
func (s *Server) periodicSync() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
logrus.Info("Start periodic sync")
for {
select {
case <-s.closeSync:
logrus.Info("Stop periodic sync")
return
case <-ticker.C:
s.RLock()
if s.r == nil {
logrus.Warning("Stop periodic sync as s.r not set")
s.RUnlock()
return
}
if _, err := s.r.Sync(); err != nil {
logrus.Warningf("Fail to sync, err: %v", err)
}
s.RUnlock()
}
}
}
*/

func (s *Server) Reload() error {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -439,8 +469,8 @@ func (s *Server) DeleteAll() error {
func (s *Server) Close() error {
logrus.Infof("Closing replica")
s.Lock()

defer s.Unlock()

if s.r == nil {
logrus.Infof("Skip closing replica, s.r not set")
return nil
Expand All @@ -449,8 +479,10 @@ func (s *Server) Close() error {
// r.holeDrainer is initialized at construct
// function in replica.go
s.r.holeDrainer()

// notify periodicSync go routine to stop
//s.closeSync <- struct{}{}
if err := s.r.Close(); err != nil {
logrus.Errorf("Failed to close replica, err: %v", err)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type ReaderWriterAt interface {

type IOs interface {
ReaderWriterAt
io.Closer
Sync() (int, error)
Unmap(int64, int64) (int, error)
}
Expand Down Expand Up @@ -68,7 +69,6 @@ var (

type Backend interface {
IOs
io.Closer
Snapshot(name string, userCreated bool, created string) error
Resize(name string, size string) error
Size() (int64, error)
Expand Down

0 comments on commit 2b621d1

Please sign in to comment.