Skip to content

Commit

Permalink
Refactor per code climate rules
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Michael Avila <[email protected]>
  • Loading branch information
michaelavila committed Mar 9, 2019
1 parent bfcea27 commit fdcf655
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 35 deletions.
2 changes: 1 addition & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
n.Resolver = resolver.NewBasicResolver(n.DAG)

// Provider
queue, err := provider.NewQueue("provider-v1", ctx, n.Repo.Datastore())
queue, err := provider.NewQueue(ctx, "provider-v1", n.Repo.Datastore())
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions core/coreapi/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
cid "github.com/ipfs/go-cid"
)

// ProviderAPI brings Provider behavior to CoreAPI
type ProviderAPI CoreAPI

func (api *ProviderAPI) Provide(root cid.Cid) error {
return api.provider.Provide(root)
// Provide the given cid using the current provider
func (api *ProviderAPI) Provide(cid cid.Cid) error {
return api.provider.Provide(cid)
}
3 changes: 2 additions & 1 deletion provider/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package provider

import "github.com/ipfs/go-cid"

type offlineProvider struct {}
type offlineProvider struct{}

// NewOfflineProvider creates a Provider that does nothing
func NewOfflineProvider() Provider {
return &offlineProvider{}
}
Expand Down
4 changes: 2 additions & 2 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ const (
provideOutgoingWorkerLimit = 8
)

// Provider announces blocks to the network
type Provider interface {
Run()
Provide(cid.Cid) error
}

// Provider announces blocks to the network, tracks which blocks are
// being provided, and untracks blocks when they're no longer in the blockstore.
type provider struct {
ctx context.Context
// the CIDs for which provide announcements should be made
Expand All @@ -33,6 +32,7 @@ type provider struct {
contentRouting routing.ContentRouting
}

// NewProvider creates a provider that announces blocks to the network using a content router
func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) Provider {
return &provider{
ctx: ctx,
Expand Down
60 changes: 31 additions & 29 deletions provider/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
// not removed from the datastore until you call Complete() on the entry you
// receive.
type Entry struct {
cid cid.Cid
key ds.Key
cid cid.Cid
key ds.Key
queue *Queue
}

// Complete the entry by removing it from the queue
func (e *Entry) Complete() error {
return e.queue.remove(e.key)
}
Expand All @@ -41,36 +42,37 @@ type Queue struct {
tail uint64
head uint64

lock sync.Mutex
lock sync.Mutex
datastore ds.Datastore

dequeue chan *Entry
dequeue chan *Entry
notEmpty chan struct{}

isRunning bool
}

func NewQueue(name string, ctx context.Context, datastore ds.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(datastore, ds.NewKey("/" + name + "/queue/"))
head, tail, err := getQueueHeadTail(name, ctx, namespaced)
// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, datastore ds.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(datastore, ds.NewKey("/"+name+"/queue/"))
head, tail, err := getQueueHeadTail(ctx, name, namespaced)
if err != nil {
return nil, err
}
q := &Queue{
name: name,
ctx: ctx,
head: head,
tail: tail,
lock: sync.Mutex{},
name: name,
ctx: ctx,
head: head,
tail: tail,
lock: sync.Mutex{},
datastore: namespaced,
dequeue: make(chan *Entry),
notEmpty: make(chan struct{}),
dequeue: make(chan *Entry),
notEmpty: make(chan struct{}),
isRunning: false,
}
return q, nil
}

// Put a cid in the queue
// Enqueue puts a cid in the queue
func (q *Queue) Enqueue(cid cid.Cid) error {
q.lock.Lock()
defer q.lock.Unlock()
Expand All @@ -95,21 +97,18 @@ func (q *Queue) Enqueue(cid cid.Cid) error {
return nil
}

// Remove an entry from the queue.
// Dequeue returns a channel that if listened to will remove entries from the queue
func (q *Queue) Dequeue() <-chan *Entry {
return q.dequeue
}

// IsEmpty returns whether r not the queue has any items
func (q *Queue) IsEmpty() bool {
return (q.tail - q.head) == 0
}

func (q *Queue) remove(key ds.Key) error {
return q.datastore.Delete(key)
}

// dequeue items when the dequeue channel is available to
// be written to
// Run dequeues items when the dequeue channel is available to
// be written to.
func (q *Queue) Run() {
q.isRunning = true
go func() {
Expand Down Expand Up @@ -178,9 +177,9 @@ func (q *Queue) next() (*Entry, error) {
return nil, err
}

entry := &Entry {
cid: id,
key: nextKey,
entry := &Entry{
cid: id,
key: nextKey,
queue: q,
}

Expand All @@ -194,14 +193,14 @@ func (q *Queue) queueKey(id uint64) ds.Key {
}

// crawl over the queue entries to find the head and tail
func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) (uint64, uint64, error) {
func getQueueHeadTail(ctx context.Context, name string, datastore ds.Datastore) (uint64, uint64, error) {
query := query.Query{}
results, err := datastore.Query(query)
if err != nil {
return 0, 0, err
}

var tail uint64 = 0
var tail uint64
var head uint64 = math.MaxUint64
for entry := range results.Next() {
select {
Expand All @@ -219,8 +218,8 @@ func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore)
head = id
}

if (id+1) > tail {
tail = (id+1)
if (id + 1) > tail {
tail = (id + 1)
}
}
if err := results.Close(); err != nil {
Expand All @@ -233,3 +232,6 @@ func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore)
return head, tail, nil
}

func (q *Queue) remove(key ds.Key) error {
return q.datastore.Delete(key)
}

0 comments on commit fdcf655

Please sign in to comment.