Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wire a context in most of the data pipeline, connect it #7558

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions blocks/blockstoreutil/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func RmBlocks(ctx context.Context, blocks bs.GCBlockstore, pins pin.Pinner, cids
for _, c := range stillOkay {
// Kept for backwards compatibility. We may want to
// remove this sometime in the future.
has, err := blocks.Has(c)
has, err := blocks.Has(ctx, c)
if err != nil {
out <- &RemovedBlock{Hash: c.String(), Error: err.Error()}
continue
Expand All @@ -58,7 +58,7 @@ func RmBlocks(ctx context.Context, blocks bs.GCBlockstore, pins pin.Pinner, cids
continue
}

err = blocks.DeleteBlock(c)
err = blocks.DeleteBlock(ctx, c)
if err != nil {
out <- &RemovedBlock{Hash: c.String(), Error: err.Error()}
} else if !opts.Quiet {
Expand Down
2 changes: 1 addition & 1 deletion core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ Maximum supported CAR version: 1

ret := RootMeta{Cid: c}

if block, err := node.Blockstore.Get(c); err != nil {
if block, err := node.Blockstore.Get(req.Context, c); err != nil {
ret.PinErrorMsg = err.Error()
} else if nd, err := ipld.Decode(block); err != nil {
ret.PinErrorMsg = err.Error()
Expand Down
2 changes: 1 addition & 1 deletion core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ var provideRefDhtCmd = &cmds.Command{
return err
}

has, err := nd.Blockstore.Has(c)
has, err := nd.Blockstore.Has(req.Context, c)
if err != nil {
return err
}
Expand Down
15 changes: 8 additions & 7 deletions core/commands/filestore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package commands

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -56,11 +57,11 @@ The output is:
}
args := req.Arguments
if len(args) > 0 {
return listByArgs(res, fs, args)
return listByArgs(req.Context, res, fs, args)
}

fileOrder, _ := req.Options[fileOrderOptionName].(bool)
next, err := filestore.ListAll(fs, fileOrder)
next, err := filestore.ListAll(req.Context, fs, fileOrder)
if err != nil {
return err
}
Expand Down Expand Up @@ -133,11 +134,11 @@ For ERROR entries the error will also be printed to stderr.
}
args := req.Arguments
if len(args) > 0 {
return listByArgs(res, fs, args)
return listByArgs(req.Context, res, fs, args)
}

fileOrder, _ := req.Options[fileOrderOptionName].(bool)
next, err := filestore.VerifyAll(fs, fileOrder)
next, err := filestore.VerifyAll(req.Context, fs, fileOrder)
if err != nil {
return err
}
Expand Down Expand Up @@ -206,7 +207,7 @@ var dupsFileStore = &cmds.Command{
}

for cid := range ch {
have, err := fs.MainBlockstore().Has(cid)
have, err := fs.MainBlockstore().Has(req.Context, cid)
if err != nil {
return res.Emit(&RefWrapper{Err: err.Error()})
}
Expand Down Expand Up @@ -235,7 +236,7 @@ func getFilestore(env cmds.Environment) (*core.IpfsNode, *filestore.Filestore, e
return n, fs, err
}

func listByArgs(res cmds.ResponseEmitter, fs *filestore.Filestore, args []string) error {
func listByArgs(ctx context.Context, res cmds.ResponseEmitter, fs *filestore.Filestore, args []string) error {
for _, arg := range args {
c, err := cid.Decode(arg)
if err != nil {
Expand All @@ -248,7 +249,7 @@ func listByArgs(res cmds.ResponseEmitter, fs *filestore.Filestore, args []string
}
continue
}
r := filestore.Verify(fs, c)
r := filestore.Verify(ctx, fs, c)
if err := res.Emit(r); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/commands/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Ci
defer wg.Done()

for k := range keys {
_, err := bs.Get(k)
_, err := bs.Get(ctx, k)
if err != nil {
select {
case results <- fmt.Sprintf("block %s was corrupt (%s)", k, err):
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Bloc
defer api.blockstore.PinLock().Unlock()
}

err = api.blocks.AddBlock(b)
err = api.blocks.AddBlock(ctx, b)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (api *DhtAPI) Provide(ctx context.Context, path path.Path, opts ...caopts.D

c := rp.Cid()

has, err := api.blockstore.Has(c)
has, err := api.blockstore.Has(ctx, c)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
DAGService: dserv,
syncFn: func() error {
ds := api.repo.Datastore()
if err := ds.Sync(bstore.BlockPrefix); err != nil {
if err := ds.Sync(ctx, bstore.BlockPrefix); err != nil {
return err
}
return ds.Sync(filestore.FilestorePrefix)
return ds.Sync(ctx, filestore.FilestorePrefix)
},
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,16 @@ type testBlockstore struct {
countAtOffsetNonZero int
}

func (bs *testBlockstore) Put(block blocks.Block) error {
func (bs *testBlockstore) Put(ctx context.Context, block blocks.Block) error {
bs.CheckForPosInfo(block)
return bs.GCBlockstore.Put(block)
return bs.GCBlockstore.Put(ctx, block)
}

func (bs *testBlockstore) PutMany(blocks []blocks.Block) error {
func (bs *testBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
for _, blk := range blocks {
bs.CheckForPosInfo(blk)
}
return bs.GCBlockstore.PutMany(blocks)
return bs.GCBlockstore.PutMany(ctx, blocks)
}

func (bs *testBlockstore) CheckForPosInfo(block blocks.Block) {
Expand Down
27 changes: 14 additions & 13 deletions core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,21 @@ func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interf
}

// Pinning creates new pinner which tells GC which blocks should be kept
func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) {
func Pinning(mctx helpers.MetricsCtx, lc fx.Lifecycle, bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) {
internalDag := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore)))
rootDS := repo.Datastore()

syncFn := func() error {
if err := rootDS.Sync(blockstore.BlockPrefix); err != nil {
syncFn := func(ctx context.Context) error {
if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil {
return err
}
return rootDS.Sync(filestore.FilestorePrefix)
return rootDS.Sync(ctx, filestore.FilestorePrefix)
}
syncDs := &syncDagService{ds, syncFn}
syncInternalDag := &syncDagService{internalDag, syncFn}

pinning, err := pin.LoadPinner(rootDS, syncDs, syncInternalDag)
ctx := helpers.LifecycleCtx(mctx, lc)
pinning, err := pin.LoadPinner(ctx, rootDS, syncDs, syncInternalDag)
if err != nil {
// TODO: we should move towards only running 'NewPinner' explicitly on
// node init instead of implicitly here as a result of the pinner keys
Expand All @@ -73,11 +74,11 @@ var (
// syncDagService is used by the Pinner to ensure data gets persisted to the underlying datastore
type syncDagService struct {
format.DAGService
syncFn func() error
syncFn func(ctx context.Context) error
}

func (s *syncDagService) Sync() error {
return s.syncFn()
func (s *syncDagService) Sync(ctx context.Context) error {
return s.syncFn(ctx)
}

func (s *syncDagService) Session(ctx context.Context) format.NodeGetter {
Expand Down Expand Up @@ -109,22 +110,22 @@ func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.
dsk := datastore.NewKey("/local/filesroot")
pf := func(ctx context.Context, c cid.Cid) error {
rootDS := repo.Datastore()
if err := rootDS.Sync(blockstore.BlockPrefix); err != nil {
if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil {
return err
}
if err := rootDS.Sync(filestore.FilestorePrefix); err != nil {
if err := rootDS.Sync(ctx, filestore.FilestorePrefix); err != nil {
return err
}

if err := rootDS.Put(dsk, c.Bytes()); err != nil {
if err := rootDS.Put(ctx, dsk, c.Bytes()); err != nil {
return err
}
return rootDS.Sync(dsk)
return rootDS.Sync(ctx, dsk)
}

var nd *merkledag.ProtoNode
val, err := repo.Datastore().Get(dsk)
ctx := helpers.LifecycleCtx(mctx, lc)
val, err := repo.Datastore().Get(ctx, dsk)

switch {
case err == datastore.ErrNotFound || val == nil:
Expand Down
2 changes: 1 addition & 1 deletion gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn
break loop
}
if !gcs.Has(k) {
err := bs.DeleteBlock(k)
err := bs.DeleteBlock(ctx, k)
removed++
if err != nil {
errors = true
Expand Down
25 changes: 23 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ require (
github.com/ipfs/go-filestore v0.0.3
github.com/ipfs/go-fs-lock v0.0.5
github.com/ipfs/go-graphsync v0.0.5
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-blockstore v1.0.0
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-cmds v0.2.9
github.com/ipfs/go-ipfs-config v0.9.0
github.com/ipfs/go-ipfs-ds-help v0.1.1
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
Expand Down Expand Up @@ -115,4 +115,25 @@ require (
gopkg.in/cheggaaa/pb.v1 v1.0.28
)

replace (
github.com/ipfs/go-bitswap => github.com/MichaelMure/go-bitswap v0.2.20-0.20200720173351-71e8bd0a1b1a
github.com/ipfs/go-blockservice => github.com/MichaelMure/go-blockservice v0.1.4-0.20200720174746-aaf978f8c7e2
github.com/ipfs/go-datastore => github.com/MichaelMure/go-datastore v0.1.1-0.20200720160526-a840b6a243a7
github.com/ipfs/go-ds-badger => github.com/MichaelMure/go-ds-badger v0.0.8-0.20200720160655-efad091216d6
github.com/ipfs/go-ds-flatfs => github.com/MichaelMure/go-ds-flatfs v0.1.1-0.20200720153411-098af07a315f
github.com/ipfs/go-ds-leveldb => github.com/MichaelMure/go-ds-leveldb v0.1.1-0.20200720161456-f8de4f0cb528
github.com/ipfs/go-ds-measure => github.com/MichaelMure/go-ds-measure v0.0.2-0.20200720154401-7670e7876069
github.com/ipfs/go-filestore => github.com/MichaelMure/go-filestore v1.0.1-0.20200720201744-9d0dd571946e
github.com/ipfs/go-ipfs-blockstore => github.com/MichaelMure/go-ipfs-blockstore v1.0.1-0.20200720163520-6daba4f3daa0
github.com/ipfs/go-ipfs-exchange-interface => github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20200720171619-b6887ed2e001
github.com/ipfs/go-ipfs-exchange-offline => github.com/MichaelMure/go-ipfs-exchange-offline v0.0.2-0.20200720172012-8e028686ae1e
github.com/ipfs/go-ipfs-provider => github.com/MichaelMure/go-ipfs-provider v0.2.2-0.20200720181729-c6b88a2c5d79
github.com/ipfs/go-ipfs-routing => github.com/MichaelMure/go-ipfs-routing v0.1.1-0.20200720164918-a1a2e00f1a0e
github.com/ipfs/go-merkledag => github.com/MichaelMure/go-merkledag v0.2.1-0.20200720175629-7f5b5e19fa8c
github.com/libp2p/go-libp2p-kad-dht => github.com/MichaelMure/go-libp2p-kad-dht v0.0.0-20200721082318-ffb18adb50cb
github.com/ipfs/go-ipfs-pinner => github.com/MichaelMure/go-ipfs-pinner v0.0.0-20200721082940-4cae5f9fa49d
github.com/libp2p/go-libp2p-pubsub-router => github.com/MichaelMure/go-libp2p-pubsub-router v0.3.1-0.20200722090547-ec92a4e5678b
github.com/ipfs/go-graphsync => github.com/MichaelMure/go-graphsync v0.0.6-0.20200722092520-d944aa3c7da9
)

go 1.13
Loading