diff --git a/Makefile b/Makefile index b0428e74..2a5a6f06 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,7 @@ help: @echo "Usage:-" @echo "\tmake build -- will create jiva image" @echo "\tmake deps -- will verify build dependencies are installed" + @echo "\tARCH = $(ARCH) -- arch where make is running" @echo "" @@ -61,10 +62,9 @@ mod: go.mod go.sum deps: _build_check_go _build_check_docker mod @echo "INFO:\tVerifying dependencies for jiva" - _run_ci: @echo "INFO:\tRun ci over jiva image" - sudo bash ./ci/start_init_test.sh + sudo -E bash ./ci/start_init_test.sh test: @echo "INFO:\tRun ci over jiva image" diff --git a/app/replica.go b/app/replica.go index 1f3208e6..f7a0e461 100644 --- a/app/replica.go +++ b/app/replica.go @@ -13,6 +13,7 @@ import ( "time" "github.com/openebs/jiva/alertlog" + "github.com/openebs/jiva/sync" "github.com/openebs/jiva/types" @@ -21,7 +22,6 @@ import ( "github.com/openebs/jiva/replica" "github.com/openebs/jiva/replica/rest" "github.com/openebs/jiva/replica/rpc" - "github.com/openebs/jiva/sync" "github.com/openebs/jiva/util" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -309,6 +309,7 @@ func startReplica(c *cli.Context) error { return err } } + select { case resp = <-controlResp: alertlog.Logger.Errorw("", diff --git a/go.sum b/go.sum index b2425df8..76562a9e 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/replica/backup.go b/replica/backup.go index 47627e45..adc1c592 100644 --- a/replica/backup.go +++ b/replica/backup.go @@ -213,7 +213,7 @@ func drainHoleCreatorChan() { } //CreateHoles removes the offsets from corresponding sparse files -func CreateHoles() error { +func CreateHoles() { var ( fd uintptr ) diff --git a/replica/replica.go b/replica/replica.go index e509e6f1..45cf9bfc 100644 --- a/replica/replica.go +++ b/replica/replica.go @@ -424,7 +424,7 @@ func (r *Replica) hardlinkDisk(target, source string) error { if err := os.Link(r.diskPath(source), r.diskPath(target)); err != nil { return fmt.Errorf("Fail to link %s to %s", source, target) } - return nil + return r.syncDir() } // ReplaceDisk replace the source with target snapshot @@ -716,6 +716,17 @@ func (r *Replica) isBackingFile(index int) bool { return index == 1 } +func (r *Replica) closeAndSyncDir(f types.DiffDisk) error { + err := f.Close() + if err != nil { + return err + } + if err = r.syncDir(); err != nil { + return err + } + return nil +} + func (r *Replica) close() error { for i, f := range r.volume.files { if f != nil && !r.isBackingFile(i) { @@ -733,22 +744,27 @@ func (r *Replica) encodeToFile(obj interface{}, file string) error { f, err := os.Create(r.diskPath(file + ".tmp")) if err != nil { - logrus.Errorf("failed to create temp file: %s while encoding the data to file", file) + logrus.Errorf("Failed to create temp file: %s while encoding the data to file", file) return err } - defer f.Close() - if err := json.NewEncoder(f).Encode(&obj); err != nil { - logrus.Errorf("failed to encode the data to file: %s", f.Name()) - return err + if lastErr := json.NewEncoder(f).Encode(&obj); err != nil { + if err := f.Close(); err != nil { + logrus.Errorf("Failed to close file: %v, err: %v", f.Name(), err) + } + logrus.Errorf("Failed to encode the data to file: %s", f.Name()) + return lastErr } if err := f.Close(); err != nil { - logrus.Errorf("failed to close file after encoding to file: %s", f.Name()) + logrus.Errorf("Failed to close file: %v after encoding", f.Name()) return err } - return os.Rename(r.diskPath(file+".tmp"), r.diskPath(file)) + if err := os.Rename(r.diskPath(file+".tmp"), r.diskPath(file)); err != nil { + return err + } + return r.syncDir() } func (r *Replica) nextFile(parsePattern *regexp.Regexp, pattern, parent string) (string, error) { @@ -769,6 +785,22 @@ func (r *Replica) openFile(name string, flag int) (types.DiffDisk, error) { return sparse.NewDirectFileIoProcessor(r.diskPath(name), os.O_RDWR|flag, 06666, true) } +// after creating or deleting the file the directory also needs to be synced +// in order to guarantee the file is visible across system crashes. See man +// page of fsync for more details. +func (r *Replica) syncDir() error { + f, err := os.Open(r.dir) + if err != nil { + return err + } + err = f.Sync() + closeErr := f.Close() + if err != nil { + return err + } + return closeErr +} + func (r *Replica) createNewHead(oldHead, parent, created string) (types.DiffDisk, disk, error) { newHeadName, err := r.nextFile(diskPattern, headName, oldHead) if err != nil { @@ -783,6 +815,8 @@ func (r *Replica) createNewHead(oldHead, parent, created string) (types.DiffDisk if err != nil { return nil, disk{}, err } + + // file created before this needs to be deleted in case of error if err := syscall.Truncate(r.diskPath(newHeadName), r.info.Size); err != nil { return nil, disk{}, err } @@ -816,7 +850,10 @@ func (r *Replica) linkDisk(oldname, newname string) error { return err } - return os.Link(r.diskPath(oldname+metadataSuffix), r.diskPath(newname+metadataSuffix)) + if err := os.Link(r.diskPath(oldname+metadataSuffix), r.diskPath(newname+metadataSuffix)); err != nil { + return err + } + return r.syncDir() } func (r *Replica) markDiskAsRemoved(name string) error { @@ -844,6 +881,11 @@ func (r *Replica) rmDisk(name string) error { if err := os.Remove(r.diskPath(name + metadataSuffix)); err != nil { lastErr = err } + + if lastErr == nil { + lastErr = r.syncDir() + } + return lastErr } @@ -909,7 +951,7 @@ func (r *Replica) createDisk(name string, userCreated bool, created string) erro if !done { r.rmDisk(newHeadDisk.Name) r.rmDisk(newSnapName) - f.Close() + f.Close() // rm only unlink the file since fd is still open return } r.rmDisk(oldHead) @@ -1115,10 +1157,16 @@ func (r *Replica) unmarshalFile(file string, obj interface{}) error { if err != nil { return err } - defer f.Close() - dec := json.NewDecoder(f) - return dec.Decode(obj) + err = json.NewDecoder(f).Decode(obj) + if err != nil { + if closeErr := f.Close(); closeErr != nil { + logrus.Errorf("Fail to close file: %v, err: %v", f.Name(), closeErr) + } + return err + } + + return f.Close() } func (r *Replica) Close() error { @@ -1152,7 +1200,7 @@ func (r *Replica) Delete() error { logrus.Error("Error in removing revision counter file, error : ", err.Error()) return err } - return nil + return r.syncDir() } func (r *Replica) DeleteAll() error { diff --git a/replica/server.go b/replica/server.go index 7efe2269..32d74971 100644 --- a/replica/server.go +++ b/replica/server.go @@ -38,6 +38,7 @@ type Server struct { //between controller and replica. If the connection is broken, //the replica attempts to connect back to controller MonitorChannel chan struct{} + //closeSync chan struct{} } func NewServer(dir string, sectorSize int64, serverType string) *Server { @@ -49,6 +50,7 @@ func NewServer(dir string, sectorSize int64, serverType string) *Server { defaultSectorSize: sectorSize, ServerType: serverType, MonitorChannel: make(chan struct{}), + // closeSync: make(chan struct{}), } } @@ -176,6 +178,34 @@ func (s *Server) Open() error { return nil } +/* +TODO: Enabling periodic sync will slow down replica a bit +need to verify how much penalty we have to pay by Enabling it. +func (s *Server) periodicSync() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + logrus.Info("Start periodic sync") + for { + select { + case <-s.closeSync: + logrus.Info("Stop periodic sync") + return + case <-ticker.C: + s.RLock() + if s.r == nil { + logrus.Warning("Stop periodic sync as s.r not set") + s.RUnlock() + return + } + if _, err := s.r.Sync(); err != nil { + logrus.Warningf("Fail to sync, err: %v", err) + } + s.RUnlock() + } + } +} +*/ + func (s *Server) Reload() error { s.Lock() defer s.Unlock() @@ -439,8 +469,8 @@ func (s *Server) DeleteAll() error { func (s *Server) Close() error { logrus.Infof("Closing replica") s.Lock() - defer s.Unlock() + if s.r == nil { logrus.Infof("Skip closing replica, s.r not set") return nil @@ -449,8 +479,10 @@ func (s *Server) Close() error { // r.holeDrainer is initialized at construct // function in replica.go s.r.holeDrainer() - + // notify periodicSync go routine to stop + //s.closeSync <- struct{}{} if err := s.r.Close(); err != nil { + logrus.Errorf("Failed to close replica, err: %v", err) return err } diff --git a/types/types.go b/types/types.go index b075cd5e..82cc9caa 100644 --- a/types/types.go +++ b/types/types.go @@ -39,6 +39,7 @@ type ReaderWriterAt interface { type IOs interface { ReaderWriterAt + io.Closer Sync() (int, error) Unmap(int64, int64) (int, error) } @@ -68,7 +69,6 @@ var ( type Backend interface { IOs - io.Closer Snapshot(name string, userCreated bool, created string) error Resize(name string, size string) error Size() (int64, error)