From 854f068ed60918c624857594a703a6724b8efb3a Mon Sep 17 00:00:00 2001
From: gary rong <garyrong0905@gmail.com>
Date: Tue, 27 Apr 2021 15:44:59 +0800
Subject: [PATCH] les: polish code (#22625)

* les: polish code

* les/vflus/server: fixes

* les: fix lint
---
 les/metrics.go                   |   1 -
 les/peer.go                      |   1 -
 les/server_handler.go            |  21 +++---
 les/vflux/server/prioritypool.go | 123 +++++++++++++++++++------------
 4 files changed, 83 insertions(+), 63 deletions(-)

diff --git a/les/metrics.go b/les/metrics.go
index d356326b76ef..07d3133c95a6 100644
--- a/les/metrics.go
+++ b/les/metrics.go
@@ -71,7 +71,6 @@ var (
 
 	connectionTimer       = metrics.NewRegisteredTimer("les/connection/duration", nil)
 	serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
-	clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)
 
 	totalCapacityGauge   = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
 	totalRechargeGauge   = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
diff --git a/les/peer.go b/les/peer.go
index f6cc94dfad22..c6c672942b59 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -1099,7 +1099,6 @@ func (p *clientPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, ge
 				// set default announceType on server side
 				p.announceType = announceTypeSimple
 			}
-			p.fcClient = flowcontrol.NewClientNode(server.fcManager, p.fcParams)
 		}
 		return nil
 	})
diff --git a/les/server_handler.go b/les/server_handler.go
index 0a683c1b41d7..80fcf1c44e16 100644
--- a/les/server_handler.go
+++ b/les/server_handler.go
@@ -30,6 +30,7 @@ import (
 	"github.com/ethereum/go-ethereum/core/state"
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/ethdb"
+	"github.com/ethereum/go-ethereum/les/flowcontrol"
 	"github.com/ethereum/go-ethereum/light"
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/metrics"
@@ -122,26 +123,27 @@ func (h *serverHandler) handle(p *clientPeer) error {
 		p.Log().Debug("Light Ethereum handshake failed", "err", err)
 		return err
 	}
-
+	// Connected to another server, no messages expected, just wait for disconnection
 	if p.server {
 		if err := h.server.serverset.register(p); err != nil {
 			return err
 		}
-		// connected to another server, no messages expected, just wait for disconnection
 		_, err := p.rw.ReadMsg()
 		h.server.serverset.unregister(p)
 		return err
 	}
-	defer p.fcClient.Disconnect() // set by handshake if it's not another server
+	// Setup flow control mechanism for the peer
+	p.fcClient = flowcontrol.NewClientNode(h.server.fcManager, p.fcParams)
+	defer p.fcClient.Disconnect()
 
-	// Reject light clients if server is not synced.
-	//
-	// Put this checking here, so that "non-synced" les-server peers are still allowed
-	// to keep the connection.
+	// Reject light clients if server is not synced. Put this checking here, so
+	// that "non-synced" les-server peers are still allowed to keep the connection.
 	if !h.synced() {
 		p.Log().Debug("Light server not synced, rejecting peer")
 		return p2p.DiscRequested
 	}
+
+	// Register the peer into the peerset and clientpool
 	if err := h.server.peers.register(p); err != nil {
 		return err
 	}
@@ -150,19 +152,14 @@ func (h *serverHandler) handle(p *clientPeer) error {
 		p.Log().Debug("Client pool already closed")
 		return p2p.DiscRequested
 	}
-	activeCount, _ := h.server.clientPool.Active()
-	clientConnectionGauge.Update(int64(activeCount))
 	p.connectedAt = mclock.Now()
 
 	var wg sync.WaitGroup // Wait group used to track all in-flight task routines.
-
 	defer func() {
 		wg.Wait() // Ensure all background task routines have exited.
 		h.server.clientPool.Unregister(p)
 		h.server.peers.unregister(p.ID())
 		p.balance = nil
-		activeCount, _ := h.server.clientPool.Active()
-		clientConnectionGauge.Update(int64(activeCount))
 		connectionTimer.Update(time.Duration(mclock.Now() - p.connectedAt))
 	}()
 
diff --git a/les/vflux/server/prioritypool.go b/les/vflux/server/prioritypool.go
index 573a3570a4ee..480f77e6afa1 100644
--- a/les/vflux/server/prioritypool.go
+++ b/les/vflux/server/prioritypool.go
@@ -63,20 +63,22 @@ type priorityPool struct {
 	ns                           *nodestate.NodeStateMachine
 	clock                        mclock.Clock
 	lock                         sync.Mutex
-	inactiveQueue                *prque.Prque
 	maxCount, maxCap             uint64
 	minCap                       uint64
 	activeBias                   time.Duration
 	capacityStepDiv, fineStepDiv uint64
 
+	// The snapshot of priority pool for query.
 	cachedCurve    *capacityCurve
 	ccUpdatedAt    mclock.AbsTime
 	ccUpdateForced bool
 
-	tempState []*ppNodeInfo // nodes currently in temporary state
-	// the following fields represent the temporary state if tempState is not empty
+	// Runtime status of prioritypool, represents the
+	// temporary state if tempState is not empty
+	tempState              []*ppNodeInfo
 	activeCount, activeCap uint64
 	activeQueue            *prque.LazyQueue
+	inactiveQueue          *prque.Prque
 }
 
 // ppNodeInfo is the internal node descriptor of priorityPool
@@ -89,8 +91,9 @@ type ppNodeInfo struct {
 
 	tempState    bool   // should only be true while the priorityPool lock is held
 	tempCapacity uint64 // equals capacity when tempState is false
+
 	// the following fields only affect the temporary state and they are set to their
-	// default value when entering the temp state
+	// default value when leaving the temp state
 	minTarget, stepDiv uint64
 	bias               time.Duration
 }
@@ -157,11 +160,6 @@ func newPriorityPool(ns *nodestate.NodeStateMachine, setup *serverSetup, clock m
 func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget uint64, bias time.Duration) uint64 {
 	pp.lock.Lock()
 	pp.activeQueue.Refresh()
-	var updates []capUpdate
-	defer func() {
-		pp.lock.Unlock()
-		pp.updateFlags(updates)
-	}()
 
 	if minTarget < pp.minCap {
 		minTarget = pp.minCap
@@ -175,12 +173,13 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u
 	c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo)
 	if c == nil {
 		log.Error("requestCapacity called for unknown node", "id", node.ID())
+		pp.lock.Unlock()
 		return 0
 	}
 	pp.setTempState(c)
 	if maxTarget > c.capacity {
-		c.bias = bias
-		c.stepDiv = pp.fineStepDiv
+		pp.setTempStepDiv(c, pp.fineStepDiv)
+		pp.setTempBias(c, bias)
 	}
 	pp.setTempCapacity(c, maxTarget)
 	c.minTarget = minTarget
@@ -188,7 +187,9 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u
 	pp.inactiveQueue.Remove(c.inactiveIndex)
 	pp.activeQueue.Push(c)
 	pp.enforceLimits()
-	updates = pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity)
+	updates := pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity)
+	pp.lock.Unlock()
+	pp.updateFlags(updates)
 	return c.capacity
 }
 
@@ -196,15 +197,11 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u
 func (pp *priorityPool) SetLimits(maxCount, maxCap uint64) {
 	pp.lock.Lock()
 	pp.activeQueue.Refresh()
-	var updates []capUpdate
-	defer func() {
-		pp.lock.Unlock()
-		pp.ns.Operation(func() { pp.updateFlags(updates) })
-	}()
-
 	inc := (maxCount > pp.maxCount) || (maxCap > pp.maxCap)
 	dec := (maxCount < pp.maxCount) || (maxCap < pp.maxCap)
 	pp.maxCount, pp.maxCap = maxCount, maxCap
+
+	var updates []capUpdate
 	if dec {
 		pp.enforceLimits()
 		updates = pp.finalizeChanges(true)
@@ -212,6 +209,8 @@ func (pp *priorityPool) SetLimits(maxCount, maxCap uint64) {
 	if inc {
 		updates = append(updates, pp.tryActivate(false)...)
 	}
+	pp.lock.Unlock()
+	pp.ns.Operation(func() { pp.updateFlags(updates) })
 }
 
 // setActiveBias sets the bias applied when trying to activate inactive nodes
@@ -291,18 +290,15 @@ func (pp *priorityPool) inactivePriority(p *ppNodeInfo) int64 {
 func (pp *priorityPool) connectedNode(c *ppNodeInfo) {
 	pp.lock.Lock()
 	pp.activeQueue.Refresh()
-	var updates []capUpdate
-	defer func() {
-		pp.lock.Unlock()
-		pp.updateFlags(updates)
-	}()
-
 	if c.connected {
+		pp.lock.Unlock()
 		return
 	}
 	c.connected = true
 	pp.inactiveQueue.Push(c, pp.inactivePriority(c))
-	updates = pp.tryActivate(false)
+	updates := pp.tryActivate(false)
+	pp.lock.Unlock()
+	pp.updateFlags(updates)
 }
 
 // disconnectedNode is called when a node has been removed from the pool (both inactiveFlag
@@ -311,23 +307,22 @@ func (pp *priorityPool) connectedNode(c *ppNodeInfo) {
 func (pp *priorityPool) disconnectedNode(c *ppNodeInfo) {
 	pp.lock.Lock()
 	pp.activeQueue.Refresh()
-	var updates []capUpdate
-	defer func() {
-		pp.lock.Unlock()
-		pp.updateFlags(updates)
-	}()
-
 	if !c.connected {
+		pp.lock.Unlock()
 		return
 	}
 	c.connected = false
 	pp.activeQueue.Remove(c.activeIndex)
 	pp.inactiveQueue.Remove(c.inactiveIndex)
+
+	var updates []capUpdate
 	if c.capacity != 0 {
 		pp.setTempState(c)
 		pp.setTempCapacity(c, 0)
 		updates = pp.tryActivate(true)
 	}
+	pp.lock.Unlock()
+	pp.updateFlags(updates)
 }
 
 // setTempState internally puts a node in a temporary state that can either be reverted
@@ -342,27 +337,62 @@ func (pp *priorityPool) setTempState(c *ppNodeInfo) {
 	if c.tempCapacity != c.capacity { // should never happen
 		log.Error("tempCapacity != capacity when entering tempState")
 	}
+	// Assign all the defaults to the temp state.
 	c.minTarget = pp.minCap
 	c.stepDiv = pp.capacityStepDiv
+	c.bias = 0
 	pp.tempState = append(pp.tempState, c)
 }
 
+// unsetTempState revokes the temp status of the node and reset all internal
+// fields to the default value.
+func (pp *priorityPool) unsetTempState(c *ppNodeInfo) {
+	if !c.tempState {
+		return
+	}
+	c.tempState = false
+	if c.tempCapacity != c.capacity { // should never happen
+		log.Error("tempCapacity != capacity when leaving tempState")
+	}
+	c.minTarget = pp.minCap
+	c.stepDiv = pp.capacityStepDiv
+	c.bias = 0
+}
+
 // setTempCapacity changes the capacity of a node in the temporary state and adjusts
 // activeCap and activeCount accordingly. Since this change is performed in the temporary
 // state it should be called after setTempState and before finalizeChanges.
-func (pp *priorityPool) setTempCapacity(n *ppNodeInfo, cap uint64) {
-	if !n.tempState { // should never happen
+func (pp *priorityPool) setTempCapacity(c *ppNodeInfo, cap uint64) {
+	if !c.tempState { // should never happen
 		log.Error("Node is not in temporary state")
 		return
 	}
-	pp.activeCap += cap - n.tempCapacity
-	if n.tempCapacity == 0 {
+	pp.activeCap += cap - c.tempCapacity
+	if c.tempCapacity == 0 {
 		pp.activeCount++
 	}
 	if cap == 0 {
 		pp.activeCount--
 	}
-	n.tempCapacity = cap
+	c.tempCapacity = cap
+}
+
+// setTempBias changes the connection bias of a node in the temporary state.
+func (pp *priorityPool) setTempBias(c *ppNodeInfo, bias time.Duration) {
+	if !c.tempState { // should never happen
+		log.Error("Node is not in temporary state")
+		return
+	}
+	c.bias = bias
+}
+
+// setTempStepDiv changes the capacity divisor of a node in the temporary state.
+func (pp *priorityPool) setTempStepDiv(c *ppNodeInfo, stepDiv uint64) {
+	if !c.tempState { // should never happen
+		log.Error("Node is not in temporary state")
+		return
+	}
+	c.stepDiv = stepDiv
 }
 
 // enforceLimits enforces active node count and total capacity limits. It returns the
@@ -412,10 +442,8 @@ func (pp *priorityPool) finalizeChanges(commit bool) (updates []capUpdate) {
 		} else {
 			pp.setTempCapacity(c, c.capacity) // revert activeCount/activeCap
 		}
-		c.tempState = false
-		c.bias = 0
-		c.stepDiv = pp.capacityStepDiv
-		c.minTarget = pp.minCap
+		pp.unsetTempState(c)
+
 		if c.connected {
 			if c.capacity != 0 {
 				pp.activeQueue.Push(c)
@@ -462,13 +490,13 @@ func (pp *priorityPool) tryActivate(commit bool) []capUpdate {
 	for pp.inactiveQueue.Size() > 0 {
 		c := pp.inactiveQueue.PopItem().(*ppNodeInfo)
 		pp.setTempState(c)
+		pp.setTempBias(c, pp.activeBias)
 		pp.setTempCapacity(c, pp.minCap)
-		c.bias = pp.activeBias
 		pp.activeQueue.Push(c)
 		pp.enforceLimits()
 		if c.tempCapacity > 0 {
 			commit = true
-			c.bias = 0
+			pp.setTempBias(c, 0)
 		} else {
 			break
 		}
@@ -483,14 +511,9 @@ func (pp *priorityPool) tryActivate(commit bool) []capUpdate {
 func (pp *priorityPool) updatePriority(node *enode.Node) {
 	pp.lock.Lock()
 	pp.activeQueue.Refresh()
-	var updates []capUpdate
-	defer func() {
-		pp.lock.Unlock()
-		pp.updateFlags(updates)
-	}()
-
 	c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo)
 	if c == nil || !c.connected {
+		pp.lock.Unlock()
 		return
 	}
 	pp.activeQueue.Remove(c.activeIndex)
@@ -500,7 +523,9 @@ func (pp *priorityPool) updatePriority(node *enode.Node) {
 	} else {
 		pp.inactiveQueue.Push(c, pp.inactivePriority(c))
 	}
-	updates = pp.tryActivate(false)
+	updates := pp.tryActivate(false)
+	pp.lock.Unlock()
+	pp.updateFlags(updates)
 }
 
 // capacityCurve is a snapshot of the priority pool contents in a format that can efficiently