Skip to content

Commit

Permalink
commands/dht: use res.Emit directly
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Overbool <[email protected]>
  • Loading branch information
overbool committed Oct 28, 2018
1 parent 6e19e8c commit 514ecaf
Showing 1 changed file with 41 additions and 83 deletions.
124 changes: 41 additions & 83 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"time"

cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
"gx/ipfs/QmRKuTyCzg7HFBcV1YUhzStroGtJSb8iWgyxfsDCwFhWTS/go-path"
dag "gx/ipfs/QmY8BMUSpCwNiTmFhACmC9Bt1qT63cHP35AoQAus4x14qH/go-merkledag"

cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
path "gx/ipfs/QmRKuTyCzg7HFBcV1YUhzStroGtJSb8iWgyxfsDCwFhWTS/go-path"
cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds"
peer "gx/ipfs/QmTRhk7cgjUf2gfQ3p2M9KPECNZEW9XUrmHcFCgog4cPgB/go-libp2p-peer"
pstore "gx/ipfs/QmTTJcDL3gsnGDALjh2fDGg1onGRUdVgNL2hU2WEZcVrMX/go-libp2p-peerstore"
b58 "gx/ipfs/QmWFAMPqsEyUX7gDUsRVmMWz59FxSpJ1b2v6bJ1yYzo7jY/go-base58-fast/base58"
dag "gx/ipfs/QmY8BMUSpCwNiTmFhACmC9Bt1qT63cHP35AoQAus4x14qH/go-merkledag"
routing "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing"
notif "gx/ipfs/QmcQ81jSyWCp1jpkQ8CMbtpXT3jK7Wg6ZtYmoyWFgBoF9c/go-libp2p-routing/notifications"
ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format"
Expand Down Expand Up @@ -93,21 +93,13 @@ var queryDhtCmd = &cmds.Command{
}
}()

outChan := make(chan interface{})

go func() {
defer close(outChan)
for e := range events {
select {
case outChan <- e:
case <-req.Context.Done():
return
}
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}()

return res.Emit(outChan)
}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
Expand Down Expand Up @@ -167,22 +159,10 @@ var findProvidersDhtCmd = &cmds.Command{
return err
}

outChan := make(chan interface{})

ctx, cancel := context.WithCancel(req.Context)
ctx, events := notif.RegisterForQueryEvents(ctx)

pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders)
go func() {
defer close(outChan)
for e := range events {
select {
case outChan <- e:
case <-req.Context.Done():
return
}
}
}()

go func() {
defer cancel()
Expand All @@ -194,8 +174,13 @@ var findProvidersDhtCmd = &cmds.Command{
})
}
}()
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}

return res.Emit(outChan)
return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
Expand Down Expand Up @@ -279,22 +264,9 @@ var provideRefDhtCmd = &cmds.Command{
cids = append(cids, c)
}

outChan := make(chan interface{})

ctx, cancel := context.WithCancel(req.Context)
ctx, events := notif.RegisterForQueryEvents(ctx)

go func() {
defer close(outChan)
for e := range events {
select {
case outChan <- e:
case <-req.Context.Done():
return
}
}
}()

go func() {
defer cancel()
var err error
Expand All @@ -311,7 +283,13 @@ var provideRefDhtCmd = &cmds.Command{
}
}()

return res.Emit(outChan)
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
Expand Down Expand Up @@ -395,22 +373,9 @@ var findPeerDhtCmd = &cmds.Command{
return err
}

outChan := make(chan interface{})

ctx, cancel := context.WithCancel(req.Context)
ctx, events := notif.RegisterForQueryEvents(ctx)

go func() {
defer close(outChan)
for v := range events {
select {
case outChan <- v:
case <-req.Context.Done():
}

}
}()

go func() {
defer cancel()
pi, err := nd.Routing.FindPeer(ctx, pid)
Expand All @@ -428,7 +393,13 @@ var findPeerDhtCmd = &cmds.Command{
})
}()

return res.Emit(outChan)
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
Expand Down Expand Up @@ -484,21 +455,9 @@ Different key types can specify other 'best' rules.
return err
}

outChan := make(chan interface{})

ctx, cancel := context.WithCancel(req.Context)
ctx, events := notif.RegisterForQueryEvents(ctx)

go func() {
defer close(outChan)
for e := range events {
select {
case outChan <- e:
case <-req.Context.Done():
}
}
}()

go func() {
defer cancel()
val, err := nd.Routing.GetValue(ctx, dhtkey)
Expand All @@ -515,7 +474,13 @@ Different key types can specify other 'best' rules.
}
}()

return res.Emit(outChan)
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
Expand Down Expand Up @@ -582,24 +547,11 @@ NOTE: A value may not exceed 2048 bytes.
return err
}

outChan := make(chan interface{})

data := req.Arguments[1]

ctx, cancel := context.WithCancel(req.Context)
ctx, events := notif.RegisterForQueryEvents(ctx)

go func() {
defer close(outChan)
for e := range events {
select {
case outChan <- e:
case <-req.Context.Done():
return
}
}
}()

go func() {
defer cancel()
err := nd.Routing.PutValue(ctx, key, []byte(data))
Expand All @@ -611,7 +563,13 @@ NOTE: A value may not exceed 2048 bytes.
}
}()

return res.Emit(outChan)
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *notif.QueryEvent) error {
Expand Down

0 comments on commit 514ecaf

Please sign in to comment.