Skip to content

Commit

Permalink
Add provider to ipfs and provide when adding/fetching
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Michael Avila <[email protected]>
  • Loading branch information
michaelavila committed Dec 12, 2018
1 parent d34e3c8 commit 1fa7c84
Show file tree
Hide file tree
Showing 19 changed files with 292 additions and 11 deletions.
12 changes: 6 additions & 6 deletions core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"os"
"strings"

cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"

pb "gx/ipfs/QmPtj12fdwuAqj9sBSTNUxBNu8kCGNp8b3o8yUzMm5GHpq/pb"
files "gx/ipfs/QmZMWMvWMVKCbHetJ4RgndbuEF1io2UpUxwQwtNjtYPzSC/go-ipfs-files"
cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds"
cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
"gx/ipfs/QmPtj12fdwuAqj9sBSTNUxBNu8kCGNp8b3o8yUzMm5GHpq/pb"
"gx/ipfs/QmZMWMvWMVKCbHetJ4RgndbuEF1io2UpUxwQwtNjtYPzSC/go-ipfs-files"
"gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds"
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
mh "gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash"
)

Expand Down
2 changes: 2 additions & 0 deletions core/commands/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ than 'sha2-256' or format to anything other than 'v0' will result in CIDv1.
}

return cmds.EmitOnce(res, &BlockStat{
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
Key: p.Path().Cid().String(),
Size: p.Size(),
})
Expand Down
25 changes: 24 additions & 1 deletion core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ into an object of the specified format.
cmdkit.StringOption("hash", "Hash function to use").WithDefault(""),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

nd, err := cmdenv.GetNode(env)
if err != nil {
return err
Expand Down Expand Up @@ -138,7 +143,11 @@ into an object of the specified format.
return err
}
}
return nil

return cids.ForEach(func(cid cid.Cid) error {
api.Provider().Provide(cid)
return nil
})
},
Type: OutputObject{},
Encoders: cmds.EncoderMap{
Expand All @@ -161,6 +170,11 @@ format.
cmdkit.StringArg("ref", true, false, "The object to get").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

nd, err := cmdenv.GetNode(env)
if err != nil {
return err
Expand All @@ -180,6 +194,8 @@ format.
return err
}

api.Provider().Provide(obj.Cid())

var out interface{} = obj
if len(rem) > 0 {
final, _, err := obj.Resolve(rem)
Expand All @@ -204,6 +220,11 @@ var DagResolveCmd = &cmds.Command{
cmdkit.StringArg("ref", true, false, "The path to resolve").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

nd, err := cmdenv.GetNode(env)
if err != nil {
return err
Expand All @@ -219,6 +240,8 @@ var DagResolveCmd = &cmds.Command{
return err
}

api.Provider().Provide(lastCid)

return cmds.EmitOnce(res, &ResolveOutput{
Cid: lastCid,
RemPath: path.Join(rem),
Expand Down
7 changes: 7 additions & 0 deletions core/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ may also specify the level of compression by specifying '-l=<1-9>'.
return err
}

api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

node, err := cmdenv.GetNode(env)
if err != nil {
return err
Expand All @@ -91,6 +96,8 @@ may also specify the level of compression by specifying '-l=<1-9>'.
return err
}

api.Provider().Provide(dn.Cid())

archive, _ := req.Options[archiveOptionName].(bool)
reader, err := uarchive.DagArchive(ctx, dn, p.String(), node.DAG, archive, cmplvl)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ The JSON output contains type information.
Hash: paths[i],
Links: outputLinks,
}

api.Provider().Provide(dagnode.Cid())
}

return cmds.EmitOnce(res, &LsOutput{output})
Expand Down Expand Up @@ -177,6 +179,8 @@ The JSON output contains type information.
return err
}
}

api.Provider().Provide(dagnode.Cid())
}
return nil
},
Expand Down
13 changes: 11 additions & 2 deletions core/commands/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
"io"
"strings"

Expand Down Expand Up @@ -73,6 +74,11 @@ NOTE: List all references recursively by using the flag '-r'.
return err
}

api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

ctx := req.Context
n, err := cmdenv.GetNode(env)
if err != nil {
Expand All @@ -97,7 +103,7 @@ NOTE: List all references recursively by using the flag '-r'.
format = "<src> -> <dst>"
}

objs, err := objectsForPaths(ctx, n, req.Arguments)
objs, err := objectsForPaths(ctx, n, api, req.Arguments)
if err != nil {
return err
}
Expand Down Expand Up @@ -159,7 +165,7 @@ Displays the hashes of all local objects.
Type: RefWrapper{},
}

func objectsForPaths(ctx context.Context, n *core.IpfsNode, paths []string) ([]ipld.Node, error) {
func objectsForPaths(ctx context.Context, n *core.IpfsNode, api coreiface.CoreAPI, paths []string) ([]ipld.Node, error) {
objects := make([]ipld.Node, len(paths))
for i, sp := range paths {
p, err := path.ParsePath(sp)
Expand All @@ -171,6 +177,9 @@ func objectsForPaths(ctx context.Context, n *core.IpfsNode, paths []string) ([]i
if err != nil {
return nil, err
}

api.Provider().Provide(o.Cid())

objects[i] = o
}
return objects, nil
Expand Down
8 changes: 8 additions & 0 deletions core/commands/tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ represent it.
cmdkit.FileArg("file", true, false, "Tar file to add.").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

nd, err := cmdenv.GetNode(env)
if err != nil {
return err
Expand All @@ -56,6 +61,9 @@ represent it.

c := node.Cid()

api.Provider().Provide(c)

// TODO: why is this here?
fi.FileName()
return cmds.EmitOnce(res, &coreiface.AddEvent{
Name: fi.FileName(),
Expand Down
11 changes: 11 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"time"

provider "github.com/ipfs/go-ipfs/provider"
version "github.com/ipfs/go-ipfs"
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
filestore "github.com/ipfs/go-ipfs/filestore"
Expand Down Expand Up @@ -133,6 +134,7 @@ type IpfsNode struct {
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Reprovider *rp.Reprovider // the value reprovider system
Provider *provider.Provider // the value provider system
IpnsRepub *ipnsrp.Republisher

PubSub *pubsub.PubSub
Expand Down Expand Up @@ -317,6 +319,15 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
return err
}

// Provider

// TODO: Make strategy configurable
strategy := provider.NewProvideAllStrategy(n.DAG)
n.Provider = provider.NewProvider(ctx, strategy, n.Routing)
go n.Provider.Run()

// Reprovider

var keyProvider rp.KeyChanFunc

switch cfg.Reprovider.Strategy {
Expand Down
6 changes: 6 additions & 0 deletions core/coreapi/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Bloc
return nil, err
}

api.core().Provider().Provide(b.Cid())

return &BlockStat{path: coreiface.IpldPath(b.Cid()), size: len(data)}, nil
}

Expand All @@ -62,6 +64,8 @@ func (api *BlockAPI) Get(ctx context.Context, p coreiface.Path) (io.Reader, erro
return nil, err
}

api.core().Provider().Provide(rp.Cid())

return bytes.NewReader(b.RawData()), nil
}

Expand Down Expand Up @@ -114,6 +118,8 @@ func (api *BlockAPI) Stat(ctx context.Context, p coreiface.Path) (coreiface.Bloc
return nil, err
}

api.core().Provider().Provide(b.Cid())

return &BlockStat{
path: coreiface.IpldPath(b.Cid()),
size: len(b.RawData()),
Expand Down
7 changes: 5 additions & 2 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ package coreapi

import (
"context"

core "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"

ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
Expand Down Expand Up @@ -87,6 +86,10 @@ func (api *CoreAPI) PubSub() coreiface.PubSubAPI {
return (*PubSubAPI)(api)
}

func (api *CoreAPI) Provider() coreiface.ProviderAPI {
return (*ProviderAPI)(api)
}

// getSession returns new api backed by the same node with a read-only session DAG
func (api *CoreAPI) getSession(ctx context.Context) *CoreAPI {
ng := dag.NewReadOnlyDagService(dag.NewSession(ctx, api.dag))
Expand Down
3 changes: 3 additions & 0 deletions core/coreapi/interface/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type CoreAPI interface {
// Swarm returns an implementation of Swarm API
Swarm() SwarmAPI

// Provider returns an implementation of ProvideAPI
Provider() ProviderAPI

// PubSub returns an implementation of PubSub API
PubSub() PubSubAPI

Expand Down
9 changes: 9 additions & 0 deletions core/coreapi/interface/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package iface

import "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"

type ProviderAPI interface {
// Announce that the given cid is being provided
Provide(cid.Cid)
}

11 changes: 11 additions & 0 deletions core/coreapi/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func (api *ObjectAPI) New(ctx context.Context, opts ...caopts.ObjectNewOption) (
if err != nil {
return nil, err
}

api.core().Provider().Provide(n.Cid())

return n, nil
}

Expand Down Expand Up @@ -134,6 +137,8 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj
}
}

api.core().Provider().Provide(dagnode.Cid())

return coreiface.IpfsPath(dagnode.Cid()), nil
}

Expand Down Expand Up @@ -231,6 +236,8 @@ func (api *ObjectAPI) AddLink(ctx context.Context, base coreiface.Path, name str
return nil, err
}

api.core().Provider().Provide(nnode.Cid())

return coreiface.IpfsPath(nnode.Cid()), nil
}

Expand All @@ -257,6 +264,8 @@ func (api *ObjectAPI) RmLink(ctx context.Context, base coreiface.Path, link stri
return nil, err
}

api.core().Provider().Provide(nnode.Cid())

return coreiface.IpfsPath(nnode.Cid()), nil
}

Expand Down Expand Up @@ -294,6 +303,8 @@ func (api *ObjectAPI) patchData(ctx context.Context, path coreiface.Path, r io.R
return nil, err
}

api.core().Provider().Provide(pbnd.Cid())

return coreiface.IpfsPath(pbnd.Cid()), nil
}

Expand Down
3 changes: 3 additions & 0 deletions core/coreapi/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (ipld.Nod
if err != nil {
return nil, err
}

api.Provider().Provide(node.Cid())

return node, nil
}

Expand Down
10 changes: 10 additions & 0 deletions core/coreapi/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package coreapi

import "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"

type ProviderAPI CoreAPI

func (api *ProviderAPI) Provide(cid cid.Cid) {
api.node.Provider.Provide(cid)
}

7 changes: 7 additions & 0 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.File, opts ...options
if err != nil {
return nil, err
}

if !settings.Local {
api.core().Provider().Provide(nd.Cid())
}

return coreiface.IpfsPath(nd.Cid()), nil
}

Expand All @@ -141,6 +146,8 @@ func (api *UnixfsAPI) Get(ctx context.Context, p coreiface.Path) (coreiface.Unix
return nil, err
}

api.core().Provider().Provide(nd.Cid())

return newUnixfsFile(ctx, ses.dag, nd, "", nil)
}

Expand Down
Loading

0 comments on commit 1fa7c84

Please sign in to comment.