Skip to content

Commit

Permalink
When GC deletes a block also unprovide it
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Michael Avila <[email protected]>
  • Loading branch information
michaelavila committed Jan 8, 2019
1 parent 371157e commit d86a2fb
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 16 deletions.
3 changes: 2 additions & 1 deletion core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ package coreapi

import (
"context"
"github.com/ipfs/go-ipfs/core"

core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"

ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
Expand Down
13 changes: 11 additions & 2 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-ipfs/core"
gc "github.com/ipfs/go-ipfs/pin/gc"
repo "github.com/ipfs/go-ipfs/repo"
provider "github.com/ipfs/go-ipfs/provider"

humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
Expand All @@ -29,6 +30,14 @@ type GC struct {
Storage uint64
}

type GCCleanup struct {
*provider.Provider
}

func (c *GCCleanup) Cleanup(cid cid.Cid) error {
return c.Unprovide(cid)
}

func NewGC(n *core.IpfsNode) (*GC, error) {
r := n.Repo
cfg, err := r.Config()
Expand Down Expand Up @@ -86,7 +95,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
if err != nil {
return err
}
rmed := gc.GC(ctx, n.Blockstore, n.Repo.Datastore(), n.Pinning, roots)
rmed := gc.GC(ctx, n.Blockstore, n.Repo.Datastore(), n.Pinning, &GCCleanup{n.Provider}, roots)

return CollectResult(ctx, rmed, nil)
}
Expand Down Expand Up @@ -154,7 +163,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) <-chan gc.Result
return out
}

return gc.GC(ctx, n.Blockstore, n.Repo.Datastore(), n.Pinning, roots)
return gc.GC(ctx, n.Blockstore, n.Repo.Datastore(), n.Pinning, &GCCleanup{n.Provider}, roots)
}

func PeriodicGC(ctx context.Context, node *core.IpfsNode) error {
Expand Down
15 changes: 12 additions & 3 deletions pin/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ type Result struct {
Error error
}

// Called with a cid.Cid that has just been removed from the blockstore.
// The intention is to clean up any other information about this block
// such as provider tracking.
type Cleanup interface {
Cleanup(cid.Cid) error
}

// GC performs a mark and sweep garbage collection of the blocks in the blockstore
// first, it creates a 'marked' set and adds to it the following:
// - all recursively pinned blocks, plus all of their descendants (recursively)
Expand All @@ -38,8 +45,7 @@ type Result struct {
//
// The routine then iterates over every block in the blockstore and
// deletes any block that is not found in the marked set.
func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn pin.Pinner, bestEffortRoots []cid.Cid) <-chan Result {

func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn pin.Pinner, cleanup Cleanup, bestEffortRoots []cid.Cid) <-chan Result {
elock := log.EventBegin(ctx, "GC.lockWait")
unlocker := bs.GCLock()
elock.Done()
Expand Down Expand Up @@ -95,10 +101,13 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn
if err != nil {
errors = true
output <- Result{Error: &CannotDeleteBlockError{k, err}}
//log.Errorf("Error removing key from blockstore: %s", err)
// log.Errorf("Error removing key from blockstore: %s", err)
// continue as error is non-fatal
continue loop
}
if err := cleanup.Cleanup(k); err != nil {
log.Warningf("Warning: unable to cleanup block: %s", k)
}
select {
case output <- Result{KeyRemoved: k}:
case <-ctx.Done():
Expand Down
17 changes: 13 additions & 4 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
provideOutgoingTimeout = 15 * time.Second
)

type Strategy func(context.Context, chan cid.Cid, cid.Cid)
type Strategy func(context.Context, cid.Cid) <-chan cid.Cid

type Provider struct {
ctx context.Context
Expand Down Expand Up @@ -60,7 +60,16 @@ func (p *Provider) Run() {

// Provider the given cid using specified strategy.
func (p *Provider) Provide(root cid.Cid) {
p.strategy(p.ctx, p.incoming, root)
cids := p.strategy(p.ctx, root)
go func() {
for cid := range cids {
p.incoming <- cid
}
}()
}

func (p *Provider) Unprovide(cid cid.Cid) error {
return p.tracker.Untrack(cid)
}

// Announce to the world that a block is provided.
Expand All @@ -81,7 +90,7 @@ func (p *Provider) handleIncoming() {
case key := <-p.incoming:
isTracking, err := p.tracker.IsTracking(key)
if err != nil {
log.Warning("Unable to check provider tracking: %s", err)
log.Warning("Unable to check provider tracking on incoming: %s", err)
continue
}

Expand Down Expand Up @@ -133,7 +142,7 @@ func (p *Provider) handleOutgoing() {
case key := <-p.outgoing:
isTracking, err := p.tracker.IsTracking(key)
if err != nil {
log.Warning("Unable to check provider tracking: %s, %s", key, err)
log.Warning("Unable to check provider tracking on outgoing: %s, %s", key, err)
continue
}
if isTracking {
Expand Down
16 changes: 10 additions & 6 deletions provider/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (
)

func NewProvideAllStrategy(dag ipld.DAGService) Strategy {
return func(ctx context.Context, cids chan cid.Cid, root cid.Cid) {
cids <- root
merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), root, func(cid cid.Cid) bool {
cids <- cid
return true
})
return func(ctx context.Context, root cid.Cid) <-chan cid.Cid {
cids := make(chan cid.Cid)
go func() {
cids <- root
merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), root, func(cid cid.Cid) bool {
cids <- cid
return true
})
}()
return cids
}
}
4 changes: 4 additions & 0 deletions provider/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func (t *Tracker) Track(cid cid.Cid) error {
return t.datastore.Put(providerTrackingKey(cid), cid.Bytes())
}

func (t *Tracker) Untrack(cid cid.Cid) error {
return t.datastore.Delete(providerTrackingKey(cid))
}

func providerTrackingKey(cid cid.Cid) ds.Key {
return ds.NewKey(providerTrackingPrefix + cid.String())
}

0 comments on commit d86a2fb

Please sign in to comment.