diff --git a/fd.go b/fd.go index ea04bc9..35b945c 100644 --- a/fd.go +++ b/fd.go @@ -7,6 +7,16 @@ import ( mod "github.com/ipfs/go-unixfs/mod" context "context" + + ipld "github.com/ipfs/go-ipld-format" +) + +type state uint8 + +const ( + stateFlushed state = iota + stateDirty + stateClosed ) // One `File` can have many `FileDescriptor`s associated to it @@ -31,14 +41,31 @@ type FileDescriptor interface { } type fileDescriptor struct { - inode *File - mod *mod.DagModifier - perms int - sync bool - hasChanges bool - - // TODO: Where is this variable set? - closed bool + inode *File + mod *mod.DagModifier + flags Flags + + state state +} + +func (fi *fileDescriptor) checkWrite() error { + if fi.state == stateClosed { + return ErrClosed + } + if !fi.flags.Write { + return fmt.Errorf("file is read-only") + } + return nil +} + +func (fi *fileDescriptor) checkRead() error { + if fi.state == stateClosed { + return ErrClosed + } + if !fi.flags.Read { + return fmt.Errorf("file is write-only") + } + return nil } // Size returns the size of the file referred to by this descriptor @@ -48,34 +75,34 @@ func (fi *fileDescriptor) Size() (int64, error) { // Truncate truncates the file to size func (fi *fileDescriptor) Truncate(size int64) error { - if fi.perms == OpenReadOnly { - return fmt.Errorf("cannot call truncate on readonly file descriptor") + if err := fi.checkWrite(); err != nil { + return fmt.Errorf("truncate failed: %s", err) } - fi.hasChanges = true + fi.state = stateDirty return fi.mod.Truncate(size) } // Write writes the given data to the file at its current offset func (fi *fileDescriptor) Write(b []byte) (int, error) { - if fi.perms == OpenReadOnly { - return 0, fmt.Errorf("cannot write on not writeable descriptor") + if err := fi.checkWrite(); err != nil { + return 0, fmt.Errorf("write failed: %s", err) } - fi.hasChanges = true + fi.state = stateDirty return fi.mod.Write(b) } // Read reads into the given buffer from the current offset func (fi *fileDescriptor) Read(b []byte) (int, error) { - if fi.perms == OpenWriteOnly { - return 0, fmt.Errorf("cannot read on write-only descriptor") + if err := fi.checkRead(); err != nil { + return 0, fmt.Errorf("read failed: %s", err) } return fi.mod.Read(b) } // Read reads into the given buffer from the current offset func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) { - if fi.perms == OpenWriteOnly { - return 0, fmt.Errorf("cannot read on write-only descriptor") + if err := fi.checkRead(); err != nil { + return 0, fmt.Errorf("read failed: %s", err) } return fi.mod.CtxReadFull(ctx, b) } @@ -83,34 +110,17 @@ func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error // Close flushes, then propogates the modified dag node up the directory structure // and signals a republish to occur func (fi *fileDescriptor) Close() error { - defer func() { - switch fi.perms { - case OpenReadOnly: - fi.inode.desclock.RUnlock() - case OpenWriteOnly, OpenReadWrite: - fi.inode.desclock.Unlock() - } - // TODO: `closed` should be set here. - }() - - if fi.closed { - panic("attempted to close file descriptor twice!") + if fi.state == stateClosed { + return ErrClosed } - - if fi.hasChanges { - err := fi.mod.Sync() - if err != nil { - return err - } - - fi.hasChanges = false - - // explicitly stay locked for flushUp call, - // it will manage the lock for us - return fi.flushUp(fi.sync) + if fi.flags.Write { + defer fi.inode.desclock.Unlock() + } else if fi.flags.Read { + defer fi.inode.desclock.RUnlock() } - - return nil + err := fi.flushUp(fi.flags.Sync) + fi.state = stateClosed + return err } // Flush generates a new version of the node of the underlying @@ -126,47 +136,57 @@ func (fi *fileDescriptor) Flush() error { // If `fullSync` is set the changes are propagated upwards // (the `Up` part of `flushUp`). func (fi *fileDescriptor) flushUp(fullSync bool) error { - nd, err := fi.mod.GetNode() - if err != nil { - return err - } + var nd ipld.Node + switch fi.state { + case stateDirty: + // calls mod.Sync internally. + var err error + nd, err = fi.mod.GetNode() + if err != nil { + return err + } + err = fi.inode.dagService.Add(context.TODO(), nd) + if err != nil { + return err + } + fi.inode.nodeLock.Lock() + fi.inode.node = nd + fi.inode.nodeLock.Unlock() + fallthrough + case stateFlushed: + if !fullSync { + return nil + } - err = fi.inode.dagService.Add(context.TODO(), nd) - if err != nil { - return err - } - // TODO: Very similar logic to the update process in - // `Directory`, the logic should be unified, both structures - // (`File` and `Directory`) are backed by a IPLD node with - // a UnixFS format that is the actual target of the update - // (regenerating it and adding it to the DAG service). - - fi.inode.nodeLock.Lock() - fi.inode.node = nd - // TODO: Create a `SetNode` method. - name := fi.inode.name - parent := fi.inode.parent - // TODO: Can the parent be modified? Do we need to do this inside the lock? - fi.inode.nodeLock.Unlock() - // TODO: Maybe all this logic should happen in `File`. - - if fullSync { - return parent.updateChildEntry(child{name, nd}) - } + fi.inode.nodeLock.Lock() + nd = fi.inode.node + parent := fi.inode.parent + name := fi.inode.name + fi.inode.nodeLock.Unlock() - return nil + if err := parent.updateChildEntry(child{name, nd}); err != nil { + return err + } + fi.state = stateFlushed + return nil + default: + panic("invalid state") + } } // Seek implements io.Seeker func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) { + if fi.state == stateClosed { + return 0, fmt.Errorf("seek failed: %s", ErrClosed) + } return fi.mod.Seek(offset, whence) } // Write At writes the given bytes at the offset 'at' func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) { - if fi.perms == OpenReadOnly { - return 0, fmt.Errorf("cannot write on not writeable descriptor") + if err := fi.checkWrite(); err != nil { + return 0, fmt.Errorf("write-at failed: %s", err) } - fi.hasChanges = true + fi.state = stateDirty return fi.mod.WriteAt(b, at) } diff --git a/file.go b/file.go index 7a20fdf..fd2eb28 100644 --- a/file.go +++ b/file.go @@ -26,7 +26,7 @@ type File struct { // entire DAG of nodes that comprise the file. // TODO: Rename, there should be an explicit term for these root nodes // of a particular sub-DAG that abstract an upper layer's entity. - node ipld.Node + node ipld.Node // Lock around the `node` that represents this file, necessary because // there may be many `FileDescriptor`s operating on this `File`. @@ -52,13 +52,25 @@ func NewFile(name string, node ipld.Node, parent parent, dserv ipld.DAGService) return fi, nil } -const ( - OpenReadOnly = iota - OpenWriteOnly - OpenReadWrite -) +func (fi *File) Open(flags Flags) (_ FileDescriptor, _retErr error) { + if flags.Write { + fi.desclock.Lock() + defer func() { + if _retErr != nil { + fi.desclock.Unlock() + } + }() + } else if flags.Read { + fi.desclock.RLock() + defer func() { + if _retErr != nil { + fi.desclock.Unlock() + } + }() + } else { + return nil, fmt.Errorf("file opened for neither reading nor writing") + } -func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) { fi.nodeLock.RLock() node := fi.node fi.nodeLock.RUnlock() @@ -86,16 +98,6 @@ func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) { // Ok as well. } - switch flags { - case OpenReadOnly: - fi.desclock.RLock() - case OpenWriteOnly, OpenReadWrite: - fi.desclock.Lock() - default: - // TODO: support other modes - return nil, fmt.Errorf("mode not supported") - } - dmod, err := mod.NewDagModifier(context.TODO(), node, fi.dagService, chunker.DefaultSplitter) // TODO: Remove the use of the `chunker` package here, add a new `NewDagModifier` in // `go-unixfs` with the `DefaultSplitter` already included. @@ -106,8 +108,7 @@ func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) { return &fileDescriptor{ inode: fi, - perms: flags, - sync: sync, + flags: flags, mod: dmod, }, nil } @@ -153,7 +154,7 @@ func (fi *File) GetNode() (ipld.Node, error) { // a file without flushing?) func (fi *File) Flush() error { // open the file in fullsync mode - fd, err := fi.Open(OpenWriteOnly, true) + fd, err := fi.Open(Flags{Write: true, Sync: true}) if err != nil { return err } diff --git a/mfs_test.go b/mfs_test.go index 2075346..8112d8a 100644 --- a/mfs_test.go +++ b/mfs_test.go @@ -14,9 +14,10 @@ import ( "testing" "time" + path "github.com/ipfs/go-path" + bserv "github.com/ipfs/go-blockservice" dag "github.com/ipfs/go-merkledag" - "github.com/ipfs/go-path" ft "github.com/ipfs/go-unixfs" importer "github.com/ipfs/go-unixfs/importer" uio "github.com/ipfs/go-unixfs/io" @@ -161,7 +162,7 @@ func assertFileAtPath(ds ipld.DAGService, root *Directory, expn ipld.Node, pth s return fmt.Errorf("%s was not a file", pth) } - rfd, err := file.Open(OpenReadOnly, false) + rfd, err := file.Open(Flags{Read: true}) if err != nil { return err } @@ -394,7 +395,7 @@ func TestMfsFile(t *testing.T) { t.Fatal("some is seriously wrong here") } - wfd, err := fi.Open(OpenReadWrite, true) + wfd, err := fi.Open(Flags{Read: true, Write: true, Sync: true}) if err != nil { t.Fatal(err) } @@ -554,7 +555,7 @@ func actorMakeFile(d *Directory) error { return err } - wfd, err := f.Open(OpenWriteOnly, true) + wfd, err := f.Open(Flags{Write: true, Sync: true}) if err != nil { return err } @@ -634,7 +635,7 @@ func actorWriteFile(d *Directory) error { return err } - wfd, err := fi.Open(OpenWriteOnly, true) + wfd, err := fi.Open(Flags{Write: true, Sync: true}) if err != nil { return err } @@ -666,7 +667,7 @@ func actorReadFile(d *Directory) error { return err } - rfd, err := fi.Open(OpenReadOnly, false) + rfd, err := fi.Open(Flags{Read: true}) if err != nil { return err } @@ -868,7 +869,7 @@ func readFile(rt *Root, path string, offset int64, buf []byte) error { return fmt.Errorf("%s was not a file", path) } - fd, err := fi.Open(OpenReadOnly, false) + fd, err := fi.Open(Flags{Read: true}) if err != nil { return err } @@ -946,7 +947,7 @@ func writeFile(rt *Root, path string, data []byte) error { return fmt.Errorf("expected to receive a file, but didnt get one") } - fd, err := fi.Open(OpenWriteOnly, true) + fd, err := fi.Open(Flags{Write: true, Sync: true}) if err != nil { return err } @@ -1014,7 +1015,7 @@ func TestFileDescriptors(t *testing.T) { } // test read only - rfd1, err := fi.Open(OpenReadOnly, false) + rfd1, err := fi.Open(Flags{Read: true}) if err != nil { t.Fatal(err) } @@ -1038,7 +1039,7 @@ func TestFileDescriptors(t *testing.T) { go func() { defer close(done) // can open second readonly file descriptor - rfd2, err := fi.Open(OpenReadOnly, false) + rfd2, err := fi.Open(Flags{Read: true}) if err != nil { t.Error(err) return @@ -1061,7 +1062,7 @@ func TestFileDescriptors(t *testing.T) { done = make(chan struct{}) go func() { defer close(done) - wfd1, err := fi.Open(OpenWriteOnly, true) + wfd1, err := fi.Open(Flags{Write: true, Sync: true}) if err != nil { t.Error(err) } @@ -1090,7 +1091,7 @@ func TestFileDescriptors(t *testing.T) { case <-done: } - wfd, err := fi.Open(OpenWriteOnly, true) + wfd, err := fi.Open(Flags{Write: true, Sync: true}) if err != nil { t.Fatal(err) } @@ -1119,7 +1120,7 @@ func TestTruncateAtSize(t *testing.T) { t.Fatal(err) } - fd, err := fi.Open(OpenReadWrite, true) + fd, err := fi.Open(Flags{Read: true, Write: true, Sync: true}) if err != nil { t.Fatal(err) } @@ -1144,7 +1145,7 @@ func TestTruncateAndWrite(t *testing.T) { t.Fatal(err) } - fd, err := fi.Open(OpenReadWrite, true) + fd, err := fi.Open(Flags{Read: true, Write: true, Sync: true}) defer fd.Close() if err != nil { t.Fatal(err) diff --git a/options.go b/options.go new file mode 100644 index 0000000..1edb99e --- /dev/null +++ b/options.go @@ -0,0 +1,7 @@ +package mfs + +type Flags struct { + Read bool + Write bool + Sync bool +} diff --git a/repub.go b/repub.go index 1efda7b..12738fa 100644 --- a/repub.go +++ b/repub.go @@ -46,13 +46,6 @@ func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration // WaitPub waits for the current value to be published (or returns early // if it already has). func (rp *Republisher) WaitPub() { - rp.valueLock.Lock() - valueHasBeenPublished := rp.lastValuePublished == rp.valueToPublish - rp.valueLock.Unlock() - if valueHasBeenPublished { - return - } - wait := make(chan struct{}) rp.immediatePublish <- wait <-wait diff --git a/root.go b/root.go index 9810961..cbce68d 100644 --- a/root.go +++ b/root.go @@ -18,6 +18,7 @@ import ( // TODO: Remove if not used. var ErrNotExist = errors.New("no such rootfs") +var ErrClosed = errors.New("file closed") var log = logging.Logger("mfs")