diff --git a/core/commands/add.go b/core/commands/add.go index a6c809aa182..d18fa374175 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -6,14 +6,14 @@ import ( "os" "strings" - cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/core/commands/cmdenv" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" - options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - pb "gx/ipfs/QmPtj12fdwuAqj9sBSTNUxBNu8kCGNp8b3o8yUzMm5GHpq/pb" - files "gx/ipfs/QmZMWMvWMVKCbHetJ4RgndbuEF1io2UpUxwQwtNjtYPzSC/go-ipfs-files" - cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds" - cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" + "gx/ipfs/QmPtj12fdwuAqj9sBSTNUxBNu8kCGNp8b3o8yUzMm5GHpq/pb" + "gx/ipfs/QmZMWMvWMVKCbHetJ4RgndbuEF1io2UpUxwQwtNjtYPzSC/go-ipfs-files" + "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds" + "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" mh "gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash" ) diff --git a/core/commands/block.go b/core/commands/block.go index 70603712dfa..14459bd89dd 100644 --- a/core/commands/block.go +++ b/core/commands/block.go @@ -184,6 +184,8 @@ than 'sha2-256' or format to anything other than 'v0' will result in CIDv1. } return cmds.EmitOnce(res, &BlockStat{ + // TODO be careful checking ErrNotFound. If the underlying + // implementation changes, this will break. Key: p.Path().Cid().String(), Size: p.Size(), }) diff --git a/core/commands/dag/dag.go b/core/commands/dag/dag.go index ffecb309027..1eb6907a8b0 100644 --- a/core/commands/dag/dag.go +++ b/core/commands/dag/dag.go @@ -63,6 +63,11 @@ into an object of the specified format. cmdkit.StringOption("hash", "Hash function to use").WithDefault(""), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env) + if err != nil { + return err + } + nd, err := cmdenv.GetNode(env) if err != nil { return err @@ -138,7 +143,11 @@ into an object of the specified format. return err } } - return nil + + return cids.ForEach(func(cid cid.Cid) error { + api.Provider().Provide(cid) + return nil + }) }, Type: OutputObject{}, Encoders: cmds.EncoderMap{ @@ -161,6 +170,11 @@ format. cmdkit.StringArg("ref", true, false, "The object to get").EnableStdin(), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env) + if err != nil { + return err + } + nd, err := cmdenv.GetNode(env) if err != nil { return err @@ -180,6 +194,8 @@ format. return err } + api.Provider().Provide(obj.Cid()) + var out interface{} = obj if len(rem) > 0 { final, _, err := obj.Resolve(rem) @@ -204,6 +220,11 @@ var DagResolveCmd = &cmds.Command{ cmdkit.StringArg("ref", true, false, "The path to resolve").EnableStdin(), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env) + if err != nil { + return err + } + nd, err := cmdenv.GetNode(env) if err != nil { return err @@ -219,6 +240,8 @@ var DagResolveCmd = &cmds.Command{ return err } + api.Provider().Provide(lastCid) + return cmds.EmitOnce(res, &ResolveOutput{ Cid: lastCid, RemPath: path.Join(rem), diff --git a/core/commands/get.go b/core/commands/get.go index a7b0b74b8e2..c14358dd831 100644 --- a/core/commands/get.go +++ b/core/commands/get.go @@ -66,6 +66,11 @@ may also specify the level of compression by specifying '-l=<1-9>'. return err } + api, err := cmdenv.GetApi(env) + if err != nil { + return err + } + node, err := cmdenv.GetNode(env) if err != nil { return err @@ -91,6 +96,8 @@ may also specify the level of compression by specifying '-l=<1-9>'. return err } + api.Provider().Provide(dn.Cid()) + archive, _ := req.Options[archiveOptionName].(bool) reader, err := uarchive.DagArchive(ctx, dn, p.String(), node.DAG, archive, cmplvl) if err != nil { diff --git a/core/commands/ls.go b/core/commands/ls.go index 6fd38f59128..75815fc0027 100644 --- a/core/commands/ls.go +++ b/core/commands/ls.go @@ -141,6 +141,8 @@ The JSON output contains type information. Hash: paths[i], Links: outputLinks, } + + api.Provider().Provide(dagnode.Cid()) } return cmds.EmitOnce(res, &LsOutput{output}) @@ -177,6 +179,8 @@ The JSON output contains type information. return err } } + + api.Provider().Provide(dagnode.Cid()) } return nil }, diff --git a/core/commands/refs.go b/core/commands/refs.go index c86f6120818..70562039656 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" "io" "strings" @@ -73,6 +74,11 @@ NOTE: List all references recursively by using the flag '-r'. return err } + api, err := cmdenv.GetApi(env) + if err != nil { + return err + } + ctx := req.Context n, err := cmdenv.GetNode(env) if err != nil { @@ -97,7 +103,7 @@ NOTE: List all references recursively by using the flag '-r'. format = " -> " } - objs, err := objectsForPaths(ctx, n, req.Arguments) + objs, err := objectsForPaths(ctx, n, api, req.Arguments) if err != nil { return err } @@ -159,7 +165,7 @@ Displays the hashes of all local objects. Type: RefWrapper{}, } -func objectsForPaths(ctx context.Context, n *core.IpfsNode, paths []string) ([]ipld.Node, error) { +func objectsForPaths(ctx context.Context, n *core.IpfsNode, api coreiface.CoreAPI, paths []string) ([]ipld.Node, error) { objects := make([]ipld.Node, len(paths)) for i, sp := range paths { p, err := path.ParsePath(sp) @@ -171,6 +177,9 @@ func objectsForPaths(ctx context.Context, n *core.IpfsNode, paths []string) ([]i if err != nil { return nil, err } + + api.Provider().Provide(o.Cid()) + objects[i] = o } return objects, nil diff --git a/core/commands/tar.go b/core/commands/tar.go index a17bb8c2697..7cbe811c14f 100644 --- a/core/commands/tar.go +++ b/core/commands/tar.go @@ -39,6 +39,11 @@ represent it. cmdkit.FileArg("file", true, false, "Tar file to add.").EnableStdin(), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env) + if err != nil { + return err + } + nd, err := cmdenv.GetNode(env) if err != nil { return err @@ -56,6 +61,9 @@ represent it. c := node.Cid() + api.Provider().Provide(c) + + // TODO: why is this here? fi.FileName() return cmds.EmitOnce(res, &coreiface.AddEvent{ Name: fi.FileName(), diff --git a/core/core.go b/core/core.go index c117070003c..ed11f0cfc62 100644 --- a/core/core.go +++ b/core/core.go @@ -20,6 +20,7 @@ import ( "strings" "time" + provider "github.com/ipfs/go-ipfs/provider" version "github.com/ipfs/go-ipfs" rp "github.com/ipfs/go-ipfs/exchange/reprovide" filestore "github.com/ipfs/go-ipfs/filestore" @@ -133,6 +134,7 @@ type IpfsNode struct { Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes Reprovider *rp.Reprovider // the value reprovider system + Provider *provider.Provider // the value provider system IpnsRepub *ipnsrp.Republisher PubSub *pubsub.PubSub @@ -317,6 +319,15 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { return err } + // Provider + + // TODO: Make strategy configurable + strategy := provider.NewProvideAllStrategy(n.DAG) + n.Provider = provider.NewProvider(ctx, strategy, n.Routing) + go n.Provider.Run() + + // Reprovider + var keyProvider rp.KeyChanFunc switch cfg.Reprovider.Strategy { diff --git a/core/coreapi/block.go b/core/coreapi/block.go index 933d5de721e..76876220af9 100644 --- a/core/coreapi/block.go +++ b/core/coreapi/block.go @@ -48,6 +48,8 @@ func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Bloc return nil, err } + api.core().Provider().Provide(b.Cid()) + return &BlockStat{path: coreiface.IpldPath(b.Cid()), size: len(data)}, nil } @@ -62,6 +64,8 @@ func (api *BlockAPI) Get(ctx context.Context, p coreiface.Path) (io.Reader, erro return nil, err } + api.core().Provider().Provide(rp.Cid()) + return bytes.NewReader(b.RawData()), nil } @@ -114,6 +118,8 @@ func (api *BlockAPI) Stat(ctx context.Context, p coreiface.Path) (coreiface.Bloc return nil, err } + api.core().Provider().Provide(b.Cid()) + return &BlockStat{ path: coreiface.IpldPath(b.Cid()), size: len(b.RawData()), diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 20e4c4a88d7..b025bb99b0e 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -15,8 +15,7 @@ package coreapi import ( "context" - - core "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format" @@ -87,6 +86,10 @@ func (api *CoreAPI) PubSub() coreiface.PubSubAPI { return (*PubSubAPI)(api) } +func (api *CoreAPI) Provider() coreiface.ProviderAPI { + return (*ProviderAPI)(api) +} + // getSession returns new api backed by the same node with a read-only session DAG func (api *CoreAPI) getSession(ctx context.Context) *CoreAPI { ng := dag.NewReadOnlyDagService(dag.NewSession(ctx, api.dag)) diff --git a/core/coreapi/interface/coreapi.go b/core/coreapi/interface/coreapi.go index bab4fc13b39..da13debd977 100644 --- a/core/coreapi/interface/coreapi.go +++ b/core/coreapi/interface/coreapi.go @@ -37,6 +37,9 @@ type CoreAPI interface { // Swarm returns an implementation of Swarm API Swarm() SwarmAPI + // Provider returns an implementation of ProvideAPI + Provider() ProviderAPI + // PubSub returns an implementation of PubSub API PubSub() PubSubAPI diff --git a/core/coreapi/interface/provider.go b/core/coreapi/interface/provider.go new file mode 100644 index 00000000000..1dbeac3b68f --- /dev/null +++ b/core/coreapi/interface/provider.go @@ -0,0 +1,9 @@ +package iface + +import "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" + +type ProviderAPI interface { + // Announce that the given cid is being provided + Provide(cid.Cid) +} + diff --git a/core/coreapi/object.go b/core/coreapi/object.go index bd53c2fe2a4..a46338fd35f 100644 --- a/core/coreapi/object.go +++ b/core/coreapi/object.go @@ -54,6 +54,9 @@ func (api *ObjectAPI) New(ctx context.Context, opts ...caopts.ObjectNewOption) ( if err != nil { return nil, err } + + api.core().Provider().Provide(n.Cid()) + return n, nil } @@ -134,6 +137,8 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj } } + api.core().Provider().Provide(dagnode.Cid()) + return coreiface.IpfsPath(dagnode.Cid()), nil } @@ -231,6 +236,8 @@ func (api *ObjectAPI) AddLink(ctx context.Context, base coreiface.Path, name str return nil, err } + api.core().Provider().Provide(nnode.Cid()) + return coreiface.IpfsPath(nnode.Cid()), nil } @@ -257,6 +264,8 @@ func (api *ObjectAPI) RmLink(ctx context.Context, base coreiface.Path, link stri return nil, err } + api.core().Provider().Provide(nnode.Cid()) + return coreiface.IpfsPath(nnode.Cid()), nil } @@ -294,6 +303,8 @@ func (api *ObjectAPI) patchData(ctx context.Context, path coreiface.Path, r io.R return nil, err } + api.core().Provider().Provide(pbnd.Cid()) + return coreiface.IpfsPath(pbnd.Cid()), nil } diff --git a/core/coreapi/path.go b/core/coreapi/path.go index b28861886e4..ba39e130633 100644 --- a/core/coreapi/path.go +++ b/core/coreapi/path.go @@ -27,6 +27,9 @@ func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (ipld.Nod if err != nil { return nil, err } + + api.Provider().Provide(node.Cid()) + return node, nil } diff --git a/core/coreapi/provider.go b/core/coreapi/provider.go new file mode 100644 index 00000000000..c2f879b9024 --- /dev/null +++ b/core/coreapi/provider.go @@ -0,0 +1,10 @@ +package coreapi + +import "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" + +type ProviderAPI CoreAPI + +func (api *ProviderAPI) Provide(cid cid.Cid) { + api.node.Provider.Provide(cid) +} + diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index 1bbeaa59d24..eb0723636ac 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -130,6 +130,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.File, opts ...options if err != nil { return nil, err } + + if !settings.Local { + api.core().Provider().Provide(nd.Cid()) + } + return coreiface.IpfsPath(nd.Cid()), nil } @@ -141,6 +146,8 @@ func (api *UnixfsAPI) Get(ctx context.Context, p coreiface.Path) (coreiface.Unix return nil, err } + api.core().Provider().Provide(nd.Cid()) + return newUnixfsFile(ctx, ses.dag, nd, "", nil) } diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index dac1be79caf..1a032857cb3 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -499,6 +499,8 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) { return } + i.api.Provider().Provide(newcid) + i.addUserHeaders(w) // ok, _now_ write user's headers. w.Header().Set("IPFS-Hash", newcid.String()) http.Redirect(w, r, gopath.Join(ipfsPathPrefix, newcid.String(), newPath), http.StatusCreated) @@ -576,6 +578,8 @@ func (i *gatewayHandler) deleteHandler(w http.ResponseWriter, r *http.Request) { // Redirect to new path ncid := newnode.Cid() + i.api.Provider().Provide(ncid) + i.addUserHeaders(w) // ok, _now_ write user's headers. w.Header().Set("IPFS-Hash", ncid.String()) http.Redirect(w, r, gopath.Join(ipfsPathPrefix+ncid.String(), path.Join(components[:len(components)-1])), http.StatusCreated) diff --git a/provider/provider.go b/provider/provider.go new file mode 100644 index 00000000000..5f014b863a1 --- /dev/null +++ b/provider/provider.go @@ -0,0 +1,142 @@ +package provider + +import ( + "context" + "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" + "gx/ipfs/QmZBH87CAPFHcc7cYmBqeSQ98zQ3SX9KUxiYgzPmLWNVKz/go-libp2p-routing" + logging "gx/ipfs/QmcuXC5cxs79ro2cUuHs4HQ2bkDLJUYokwL8aivcX6HW3C/go-log" + "time" +) + +var ( + log = logging.Logger("provider") +) + +const ( + provideOutgoingLimit = 512 + provideOutgoingTimeout = time.Second * 15 +) + +type Strategy func(context.Context, chan cid.Cid, cid.Cid) + +type Provider struct { + ctx context.Context + + // cids we want to provide + incoming chan cid.Cid + // cids we are working on providing now + outgoing chan cid.Cid + + // strategy for deciding which cids, given a cid, should be provided + strategy Strategy + + contentRouting routing.ContentRouting // TODO: temp, maybe +} + +func NewProvider(ctx context.Context, strategy Strategy, contentRouting routing.ContentRouting) *Provider { + return &Provider{ + ctx: ctx, + outgoing: make(chan cid.Cid), + incoming: make(chan cid.Cid), + strategy: strategy, + contentRouting: contentRouting, + } +} + +// Start workers to handle provide requests. +func (p *Provider) Run() { + go p.handleIncoming() + go p.handleOutgoing() +} + +// Provider the given cid using specified strategy. +func (p *Provider) Provide(root cid.Cid) { + p.strategy(p.ctx, p.incoming, root) +} + +// Announce to the world that a block is provided. +// +// TODO: Refactor duplication between here and the reprovider. +func (p *Provider) Announce(cid cid.Cid) { + ctx, cancel := context.WithTimeout(p.ctx, provideOutgoingTimeout) + defer cancel() + + if err := p.contentRouting.Provide(ctx, cid, true); err != nil { + log.Warning("Failed to provide key: %s", err) + } +} + +// Workers + +// Handle incoming requests to provide blocks +// +// Basically, buffer everything that comes through the incoming channel. +// Then, whenever the outgoing channel is ready to receive a value, pull +// a value out of the buffer and put it onto the outgoing channel. +func (p *Provider) handleIncoming() { + var buffer []cid.Cid // unbounded buffer between incoming/outgoing + var nextKey cid.Cid + var keys chan cid.Cid + + for { + select { + case key, ok := <-p.incoming: + if !ok { + log.Debug("incoming channel closed") + return + } + + if keys == nil { + nextKey = key + keys = p.outgoing + } else { + buffer = append(buffer, key) + } + case keys <- nextKey: + if len(buffer) > 0 { + nextKey = buffer[0] + buffer = buffer[1:] + } else { + keys = nil + } + case <-p.ctx.Done(): + return + } + } +} + +// Handle all outgoing cids by providing them +func (p *Provider) handleOutgoing() { + limit := make(chan struct{}, provideOutgoingLimit) + limitedProvide := func(cid cid.Cid, workerId int) { + defer func() { + <-limit + }() + + ev := logging.LoggableMap{"ID": workerId} + // TODO: EventBegin deprecated? + defer log.EventBegin(p.ctx, "Ipfs.Provider.Worker.Work", ev, cid) + + p.Announce(cid) + } + + for workerId := 0; ; workerId++ { + ev := logging.LoggableMap{"ID": workerId} + log.Event(p.ctx, "Ipfs.Provider.Worker.Loop", ev) + select { + case key, ok := <-p.outgoing: + if !ok { + log.Debug("outgoing channel closed") + return + } + select { + case limit <- struct{}{}: + go limitedProvide(key, workerId) + case <-p.ctx.Done(): + return + } + case <-p.ctx.Done(): + return + } + } +} diff --git a/provider/strategy.go b/provider/strategy.go new file mode 100644 index 00000000000..746e8bad757 --- /dev/null +++ b/provider/strategy.go @@ -0,0 +1,19 @@ +package provider + +import ( + "context" + "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" + "gx/ipfs/QmdURv6Sbob8TVW2tFFve9vcEWrSUgwPqeqnXyvYhLrkyd/go-merkledag" + ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format" +) + +func NewProvideAllStrategy(dag ipld.DAGService) Strategy { + return func(ctx context.Context, cids chan cid.Cid, root cid.Cid) { + cids <- root + // TODO: Use schomatis' dag walker instead of this enumerate? + merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), root, func(cid cid.Cid) bool { + cids <- cid + return true + }) + } +} \ No newline at end of file