From 0c867ba0bf632fd4256d2686132fa5b582bac1f4 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 5 Dec 2019 13:16:33 -0500 Subject: [PATCH] support async datastores --- core/coreapi/unixfs.go | 34 ++++++++++++++++++++++++++++++- core/coreunix/add.go | 11 ++++++++++ core/node/core.go | 33 +++++++++++++++++++++++++++--- go.mod | 2 +- go.sum | 4 ++-- namesys/publisher.go | 6 +++++- namesys/publisher_test.go | 43 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 125 insertions(+), 8 deletions(-) diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index b30814021fe3..ec076dac6f93 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -12,6 +12,7 @@ import ( blockservice "github.com/ipfs/go-blockservice" cid "github.com/ipfs/go-cid" cidutil "github.com/ipfs/go-cidutil" + filestore "github.com/ipfs/go-filestore" bstore "github.com/ipfs/go-ipfs-blockstore" files "github.com/ipfs/go-ipfs-files" ipld "github.com/ipfs/go-ipld-format" @@ -96,7 +97,29 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options bserv := blockservice.New(addblockstore, exch) // hash security 001 dserv := dag.NewDAGService(bserv) - fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, dserv) + // add a sync call to the DagService + // this ensures that data written to the DagService is persisted to the underlying datastore + // TODO: propagate the Sync function from the datastore through the blockstore, blockservice and dagservice + var syncDserv *syncDagService + if settings.OnlyHash { + syncDserv = &syncDagService{ + DAGService: dserv, + syncFn: func(c cid.Cid) error { return nil }, + } + } else { + syncDserv = &syncDagService{ + DAGService: dserv, + syncFn: func(c cid.Cid) error { + ds := api.repo.Datastore() + if err := ds.Sync(bstore.BlockPrefix.ChildString(c.String())); err != nil { + return err + } + return ds.Sync(filestore.FilestorePrefix) + }, + } + } + + fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, syncDserv) if err != nil { return nil, err } @@ -272,3 +295,12 @@ func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, set func (api *UnixfsAPI) core() *CoreAPI { return (*CoreAPI)(api) } + +type syncDagService struct { + ipld.DAGService + syncFn func(c cid.Cid) error +} + +func (s *syncDagService) Sync(c cid.Cid) error { + return s.syncFn(c) +} diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 6b44226cecd7..c12d604a2925 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -38,6 +38,10 @@ type Link struct { Size uint64 } +type syncer interface { + Sync(c cid.Cid) error +} + // NewAdder Returns a new Adder used for a file add operation. func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds ipld.DAGService) (*Adder, error) { bufferedDS := ipld.NewBufferedDAG(ctx, ds) @@ -316,6 +320,13 @@ func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) { return nil, err } + if asyncDagService, ok := adder.dagService.(syncer); ok { + err = asyncDagService.Sync(nd.Cid()) + if err != nil { + return nil, err + } + } + if !adder.Pin { return nd, nil } diff --git a/core/node/core.go b/core/node/core.go index b0b2d0ab1784..d04e4b3b9394 100644 --- a/core/node/core.go +++ b/core/node/core.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-filestore" "github.com/ipfs/go-ipfs-blockstore" "github.com/ipfs/go-ipfs-exchange-interface" "github.com/ipfs/go-ipfs-exchange-offline" @@ -41,18 +42,33 @@ func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interf // Pinning creates new pinner which tells GC which blocks should be kept func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) { internalDag := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore))) - pinning, err := pin.LoadPinner(repo.Datastore(), ds, internalDag) + rootDS := repo.Datastore() + + syncFn := func() error { return rootDS.Sync(blockstore.BlockPrefix) } + syncDs := &syncDagService{ds, syncFn} + syncInternalDag := &syncDagService{internalDag, syncFn} + + pinning, err := pin.LoadPinner(rootDS, syncDs, syncInternalDag) if err != nil { // TODO: we should move towards only running 'NewPinner' explicitly on // node init instead of implicitly here as a result of the pinner keys // not being found in the datastore. // this is kinda sketchy and could cause data loss - pinning = pin.NewPinner(repo.Datastore(), ds, internalDag) + pinning = pin.NewPinner(rootDS, syncDs, syncInternalDag) } return pinning, nil } +type syncDagService struct { + format.DAGService + syncFn func() error +} + +func (s *syncDagService) Sync() error { + return s.syncFn() +} + // Dag creates new DAGService func Dag(bs blockservice.BlockService) format.DAGService { return merkledag.NewDAGService(bs) @@ -77,7 +93,18 @@ func OnlineExchange(provide bool) interface{} { func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) { dsk := datastore.NewKey("/local/filesroot") pf := func(ctx context.Context, c cid.Cid) error { - return repo.Datastore().Put(dsk, c.Bytes()) + rootDS := repo.Datastore() + if err := rootDS.Sync(blockstore.BlockPrefix.ChildString(c.String())); err != nil { + return err + } + if err := rootDS.Sync(filestore.FilestorePrefix); err != nil { + return err + } + + if err := rootDS.Put(dsk, c.Bytes()); err != nil { + return err + } + return rootDS.Sync(dsk) } var nd *merkledag.ProtoNode diff --git a/go.mod b/go.mod index f54378b771d2..7f13376dfc31 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/ipfs/go-ipfs-exchange-interface v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.4 - github.com/ipfs/go-ipfs-pinner v0.0.2 + github.com/ipfs/go-ipfs-pinner v0.0.3-0.20191205171545-5901eab20ba3 github.com/ipfs/go-ipfs-posinfo v0.0.1 github.com/ipfs/go-ipfs-provider v0.3.0 github.com/ipfs/go-ipfs-routing v0.1.0 diff --git a/go.sum b/go.sum index 280e9af19ddd..27787bff9cdf 100644 --- a/go.sum +++ b/go.sum @@ -216,8 +216,8 @@ github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjN github.com/ipfs/go-ipfs-files v0.0.4 h1:WzRCivcybUQch/Qh6v8LBRhKtRsjnwyiuOV09mK7mrE= github.com/ipfs/go-ipfs-files v0.0.4/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4= github.com/ipfs/go-ipfs-flags v0.0.1/go.mod h1:RnXBb9WV53GSfTrSDVK61NLTFKvWc60n+K9EgCDh+rA= -github.com/ipfs/go-ipfs-pinner v0.0.2 h1:KRXt2V0TzoTd3mO1aONSw8C9wnZtl7RLpPruN/XDnlQ= -github.com/ipfs/go-ipfs-pinner v0.0.2/go.mod h1:KZGyGAR+yLthGEkG9tuA2zweB7O6auXaJNjX6IbEbOs= +github.com/ipfs/go-ipfs-pinner v0.0.3-0.20191205171545-5901eab20ba3 h1:pm8ztvVQ/JyiJJxby3eERLa5hoCFzPKszMnX2KN82mg= +github.com/ipfs/go-ipfs-pinner v0.0.3-0.20191205171545-5901eab20ba3/go.mod h1:s4kFZWLWGDudN8Jyd/GTpt222A12C2snA2+OTdy/7p8= github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs= github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A= github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU= diff --git a/namesys/publisher.go b/namesys/publisher.go index f5e335cd3163..1fa0c96c9c44 100644 --- a/namesys/publisher.go +++ b/namesys/publisher.go @@ -179,7 +179,11 @@ func (p *IpnsPublisher) updateRecord(ctx context.Context, k ci.PrivKey, value pa } // Put the new record. - if err := p.ds.Put(IpnsDsKey(id), data); err != nil { + key := IpnsDsKey(id) + if err := p.ds.Put(key, data); err != nil { + return nil, err + } + if err := p.ds.Sync(key); err != nil { return nil, err } return entry, nil diff --git a/namesys/publisher_test.go b/namesys/publisher_test.go index 0b7b2c939997..625103383535 100644 --- a/namesys/publisher_test.go +++ b/namesys/publisher_test.go @@ -3,6 +3,7 @@ package namesys import ( "context" "crypto/rand" + "github.com/ipfs/go-path" "testing" "time" @@ -110,3 +111,45 @@ func TestRSAPublisher(t *testing.T) { func TestEd22519Publisher(t *testing.T) { testNamekeyPublisher(t, ci.Ed25519, ds.ErrNotFound, false) } + +func TestAsyncDS(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + rt := mockrouting.NewServer().Client(testutil.RandIdentityOrFatal(t)) + ds := &checkSyncDS{ + Datastore: ds.NewMapDatastore(), + syncKeys: make(map[ds.Key]struct{}), + } + publisher := NewIpnsPublisher(rt, ds) + + ipnsFakeID := testutil.RandIdentityOrFatal(t) + ipnsVal, err := path.ParsePath("/ipns/foo.bar") + if err != nil { + t.Fatal(err) + } + + if err := publisher.Publish(ctx, ipnsFakeID.PrivateKey(), ipnsVal); err != nil { + t.Fatal(err) + } + + ipnsKey := IpnsDsKey(ipnsFakeID.ID()) + + for k := range ds.syncKeys { + if k.IsAncestorOf(ipnsKey) || k.Equal(ipnsKey) { + return + } + } + + t.Fatal("ipns key not synced") +} + +type checkSyncDS struct { + ds.Datastore + syncKeys map[ds.Key]struct{} +} + +func (d *checkSyncDS) Sync(prefix ds.Key) error { + d.syncKeys[prefix] = struct{}{} + return d.Datastore.Sync(prefix) +}