From e52c5ec58ef644bc10ae2faaaa06e1512b76fa8f Mon Sep 17 00:00:00 2001 From: lash Date: Mon, 26 Nov 2018 23:58:24 +0100 Subject: [PATCH] swarm/pss: Rebase on master after kad depth change + handler refactor --- swarm/pss/pss.go | 69 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index d0986d280b..32bad6fbef 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -897,10 +897,38 @@ func (p *Pss) forward(msg *PssMsg) error { // send with kademlia // find the closest peer to the recipient and attempt to send + + // number of sends performed. enables us to evaluate whether send was at all successful sent := 0 - p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool { + + // TODO: debug, remove in production + // calculate proximity to recipient address + ponow, _ := p.Kademlia.Pof(p.BaseAddr(), to, 0) + + // The effective depth is the same as nearest neighbor depth OR + // the amount of address bytes in the neighbor, whichever is shallower + // this term aliasing has the effect of considering ALL connected peers + // who match the address prefix as nearest neighbors, and we will forward + // to all of them. + effectiveDepth := p.Kademlia.NeighbourhoodDepth() + darkRadius := len(msg.To) * 8 + if darkRadius < addressLength*8 && effectiveDepth > darkRadius { + effectiveDepth = darkRadius + } + + // Set to depth on the first successful send + cutoffDepth := 0 + + p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, isproxbin bool) bool { info := sp.Info() + // the cutoffDepth will be set after the first successful send. + // that means that before a send has been made OR the peer returned + // is still within the effective depth, we will pass through this check + if po < cutoffDepth { + return false + } + // check if the peer is running pss var ispss bool for _, cap := range info.Caps { @@ -915,12 +943,18 @@ func (p *Pss) forward(msg *PssMsg) error { } // get the protocol peer from the forwarding peer cache - sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address()) p.fwdPoolMu.RLock() pp := p.fwdPool[sp.Info().ID] p.fwdPoolMu.RUnlock() + // TODO: debug, remove in production + // calculate proximity from returned kademlia peer to destination and log it + powill, _ := p.Kademlia.Pof(sp.Address(), to, 0) + log.Debug("forward", "topic", label(msg.Payload.Topic[:]), "self", label(p.BaseAddr()), "to", label(sp.Address()), "dest", label(to), "po", ponow, "advance", powill-ponow) + println(p.Kademlia.String()) + // attempt to send the message + // short circuit to next iteration pass when it fails err := pp.Send(context.TODO(), msg) if err != nil { metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) @@ -928,26 +962,23 @@ func (p *Pss) forward(msg *PssMsg) error { return true } sent++ - log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg)) - - // continue forwarding if: - // - if the peer is end recipient but the full address has not been disclosed - // - if the peer address matches the partial address fully - // - if the peer is in proxbin - if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) { - log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match")) - return true - } else if isproxbin { - log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address()))) - return true + + // If the po is at addresslength (TODO: how can it be greater?) + // it means that the peer address is identical to the message address + // and that peer must be the final recipient + // further forwarding is thus not needed + if po >= addressLength*8 { + return false } - // at this point we stop forwarding, and the state is as follows: - // - the peer is end recipient and we have full address - // - we are not in proxbin (directed routing) - // - partial addresses don't fully match - return false + + // activate the cutoff when we have a successful send + if sent == 1 { + cutoffDepth = effectiveDepth + } + return true }) + // if we failed to send to anyone, re-insert message in the send-queue if sent == 0 { log.Debug("unable to forward to any peers") if err := p.enqueue(msg); err != nil {