Skip to content

Commit

Permalink
namesys: select on output
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <[email protected]>
  • Loading branch information
magik6k committed Oct 16, 2018
1 parent 734615a commit 7fcf56e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 54 deletions.
37 changes: 17 additions & 20 deletions namesys/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res
defer close(outCh)
var subCh <-chan Result
var cancelSub context.CancelFunc
defer func() {
if cancelSub != nil {
cancelSub()
}
}()

for {
select {
Expand All @@ -59,20 +64,17 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res
}

if res.err != nil {
outCh <- Result{Err: res.err}
if cancelSub != nil {
cancelSub()
}
emitResult(ctx, outCh, Result{Err: res.err})
return
}
log.Debugf("resolved %s to %s", name, res.value.String())
if !strings.HasPrefix(res.value.String(), ipnsPrefix) {
outCh <- Result{Path: res.value}
emitResult(ctx, outCh, Result{Path: res.value})
break
}

if depth == 1 {
outCh <- Result{Path: res.value, Err: ErrResolveRecursion}
emitResult(ctx, outCh, Result{Path: res.value, Err: ErrResolveRecursion})
break
}

Expand All @@ -87,6 +89,7 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res
cancelSub()
}
subCtx, cancelSub = context.WithCancel(ctx)
_ = cancelSub

p := strings.TrimPrefix(res.value.String(), ipnsPrefix)
subCh = resolveAsync(subCtx, r, p, subopts)
Expand All @@ -96,27 +99,21 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res
break
}

select {
case outCh <- res:
case <-ctx.Done():
if cancelSub != nil {
cancelSub()
}
return
}
emitResult(ctx, outCh, res)
case <-ctx.Done():
if cancelSub != nil {
cancelSub()
}
return
}
if resCh == nil && subCh == nil {
if cancelSub != nil {
cancelSub()
}
return
}
}
}()
return outCh
}

func emitResult(ctx context.Context, outCh chan<- Result, r Result) {
select {
case outCh <- r:
case <-ctx.Done():
}
}
10 changes: 2 additions & 8 deletions namesys/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options
}
if subRes.error == nil {
p, err := appendPath(subRes.path)
select {
case out <- onceResult{value: p, err: err}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{value: p, err: err})
return
}
case rootRes, ok := <-rootChan:
Expand All @@ -93,10 +90,7 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options
}
if rootRes.error == nil {
p, err := appendPath(rootRes.path)
select {
case out <- onceResult{value: p, err: err}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{value: p, err: err})
}
case <-ctx.Done():
return
Expand Down
19 changes: 9 additions & 10 deletions namesys/namesys.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,11 @@ func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.
if len(segments) > 3 {
p, err := path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3])
if err != nil {
select {
case out <- onceResult{value: p, err: err}:
case <-ctx.Done():
}
return
emitOnceResult(ctx, out, onceResult{value: p, ttl: res.ttl, err: err})
}
}

select {
case out <- onceResult{value: p, ttl: res.ttl, err: res.err}:
case <-ctx.Done():
return
}
emitOnceResult(ctx, out, onceResult{value: p, ttl: res.ttl, err: res.err})
case <-ctx.Done():
return
}
Expand All @@ -163,6 +155,13 @@ func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.
return out
}

func emitOnceResult(ctx context.Context, outCh chan<- onceResult, r onceResult) {
select {
case outCh <- r:
case <-ctx.Done():
}
}

// Publish implements Publisher
func (ns *mpns) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error {
return ns.PublishWithEOL(ctx, name, value, time.Now().Add(DefaultRecordTTL))
Expand Down
20 changes: 4 additions & 16 deletions namesys/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,7 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option
err = proto.Unmarshal(val, entry)
if err != nil {
log.Debugf("RoutingResolver: could not unmarshal value for name %s: %s", name, err)
select {
case out <- onceResult{err: err}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{err: err})
return
}

Expand All @@ -129,10 +126,7 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option
// Not a multihash, probably a new style record
p, err = path.ParsePath(string(entry.GetValue()))
if err != nil {
select {
case out <- onceResult{err: err}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{err: err})
return
}
}
Expand All @@ -154,17 +148,11 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option
}
default:
log.Errorf("encountered error when parsing EOL: %s", err)
select {
case out <- onceResult{err: err}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{err: err})
return
}

select {
case out <- onceResult{value: p, ttl: ttl}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{value: p, ttl: ttl})
case <-ctx.Done():
return
}
Expand Down

0 comments on commit 7fcf56e

Please sign in to comment.