Skip to content

Commit

Permalink
Merge branch 'development' into kishan/telemetry/afg
Browse files Browse the repository at this point in the history
  • Loading branch information
kishansagathiya committed Nov 30, 2021
2 parents 6a7cdfb + ac16285 commit 09e3826
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# CODEOWNERS: https://help.github.com/articles/about-codeowners/

# Primary repo maintainers
* @noot @arijitAD @edwardmack @timwu20 @EclesioMeloJunior @jimjbrettj @kishansagathiya @omar391
* @noot @arijitAD @edwardmack @timwu20 @EclesioMeloJunior @jimjbrettj @kishansagathiya @qdm12
34 changes: 1 addition & 33 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package network

import (
"context"
"crypto/rand"
"math/big"
"sync"

"github.com/libp2p/go-libp2p-core/connmgr"
Expand Down Expand Up @@ -130,40 +128,10 @@ func (cm *ConnManager) unprotectedPeers(peers []peer.ID) []peer.ID {
func (cm *ConnManager) Connected(n network.Network, c network.Conn) {
logger.Tracef(
"Host %s connected to peer %s", n.LocalPeer(), c.RemotePeer())

if cm.connectHandler != nil {
cm.connectHandler(c.RemotePeer())
}

cm.Lock()
defer cm.Unlock()

over := len(n.Peers()) - cm.max
if over <= 0 {
return
}

// TODO: peer scoring doesn't seem to prevent us from going over the max.
// if over the max peer count, disconnect from (total_peers - maximum) peers
// (#2039)
for i := 0; i < over; i++ {
unprotPeers := cm.unprotectedPeers(n.Peers())
if len(unprotPeers) == 0 {
return
}

i, err := rand.Int(rand.Reader, big.NewInt(int64(len(unprotPeers))))
if err != nil {
logger.Errorf("error generating random number: %s", err)
return
}

up := unprotPeers[i.Int64()]
logger.Tracef("Over max peer count, disconnecting from random unprotected peer %s", up)
err = n.ClosePeer(up)
if err != nil {
logger.Tracef("failed to close connection to peer %s", up)
}
}
}

// Disconnected is called when a connection closed
Expand Down
12 changes: 5 additions & 7 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package network
import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"sync"
Expand Down Expand Up @@ -670,6 +669,10 @@ func (s *Service) startPeerSetHandler() {

func (s *Service) processMessage(msg peerset.Message) {
peerID := msg.PeerID
if peerID == "" {
logger.Errorf("found empty peer id in peerset message")
return
}
switch msg.Status {
case peerset.Connect:
addrInfo := s.host.h.Peerstore().PeerInfo(peerID)
Expand Down Expand Up @@ -704,12 +707,7 @@ func (s *Service) startProcessingMsg() {
select {
case <-s.ctx.Done():
return
case m := <-msgCh:
msg, ok := m.(peerset.Message)
if !ok {
logger.Error(fmt.Sprintf("failed to get message from peerSet: type is %T instead of peerset.Message", m))
continue
}
case msg := <-msgCh:
s.processMessage(msg)
}
}
Expand Down
2 changes: 1 addition & 1 deletion dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ type PeerRemove interface {
type Peer interface {
PeerReputation(peer.ID) (peerset.Reputation, error)
SortedPeers(idx int) chan peer.IDSlice
Messages() chan interface{}
Messages() chan peerset.Message
}
6 changes: 4 additions & 2 deletions dot/peerset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package peerset

import "github.com/libp2p/go-libp2p-core/peer"
import (
"github.com/libp2p/go-libp2p-core/peer"
)

// Handler manages peerSet.
type Handler struct {
Expand Down Expand Up @@ -88,7 +90,7 @@ func (h *Handler) Incoming(setID int, peers ...peer.ID) {
}

// Messages return result message chan.
func (h *Handler) Messages() chan interface{} {
func (h *Handler) Messages() chan Message {
return h.peerSet.resultMsgCh
}

Expand Down
45 changes: 33 additions & 12 deletions dot/peerset/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type PeerSet struct {
// TODO: this will be useful for reserved only mode
// this is for future purpose if reserved-only flag is enabled (#1888).
isReservedOnly bool
resultMsgCh chan interface{}
resultMsgCh chan Message
// time when the PeerSet was created.
created time.Time
// last time when we updated the reputations of connected nodes.
Expand Down Expand Up @@ -183,6 +183,8 @@ func NewConfigSet(in, out uint32, reservedOnly bool, allocTime time.Duration) *C
}

return &ConfigSet{
// Why are we using an array of config in the set, when we are
// using just one config
Set: []*config{set},
}
}
Expand Down Expand Up @@ -228,8 +230,8 @@ func reputationTick(reput Reputation) Reputation {
return reput.sub(diff)
}

// updateTime updates the value of latestTimeUpdate and performs all the updates that happen
// over time, such as Reputation increases for staying connected.
// updateTime updates the value of latestTimeUpdate and performs all the updates that
// happen over time, such as Reputation increases for staying connected.
func (ps *PeerSet) updateTime() error {
currTime := time.Now()
// identify the time difference between current time and last update time for peer reputation in seconds.
Expand Down Expand Up @@ -282,8 +284,8 @@ func (ps *PeerSet) updateTime() error {
}

// reportPeer on report ReputationChange of the peer based on its behaviour,
// if the updated Reputation is below BannedThresholdValue then, this node need to be disconnected
// and a drop message for the peer is sent in order to disconnect.
// if the updated Reputation is below BannedThresholdValue then, this node need to
// be disconnected and a drop message for the peer is sent in order to disconnect.
func (ps *PeerSet) reportPeer(change ReputationChange, peers ...peer.ID) error {
// we want reputations to be up-to-date before adjusting them.
if err := ps.updateTime(); err != nil {
Expand Down Expand Up @@ -516,8 +518,9 @@ func (ps *PeerSet) removePeer(setID int, peers ...peer.ID) error {
return nil
}

// incoming indicates that we have received an incoming connection. Must be answered either with
// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
// incoming indicates that we have received an incoming connection. Must be answered
// either with a corresponding `Accept` or `Reject`, except if we were already
// connected to this peer.
func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error {
if err := ps.updateTime(); err != nil {
return err
Expand All @@ -527,7 +530,11 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error {
for _, pid := range peers {
if ps.isReservedOnly {
if _, ok := ps.reservedNode[pid]; !ok {
ps.resultMsgCh <- Message{Status: Reject}
ps.resultMsgCh <- Message{
Status: Reject,
setID: uint64(setID),
PeerID: pid,
}
continue
}
}
Expand All @@ -546,11 +553,24 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error {
p := state.nodes[pid]
switch {
case p.getReputation() < BannedThresholdValue:
ps.resultMsgCh <- Message{Status: Reject}
ps.resultMsgCh <- Message{
Status: Reject,
setID: uint64(setID),
PeerID: pid,
}
case state.tryAcceptIncoming(setID, pid) != nil:
ps.resultMsgCh <- Message{Status: Reject}
ps.resultMsgCh <- Message{
Status: Reject,
setID: uint64(setID),
PeerID: pid,
}
default:
ps.resultMsgCh <- Message{Status: Accept}
logger.Debugf("incoming connection accepted from peer %s", pid)
ps.resultMsgCh <- Message{
Status: Accept,
setID: uint64(setID),
PeerID: pid,
}
}
}

Expand Down Expand Up @@ -593,6 +613,7 @@ func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) e
}
ps.resultMsgCh <- Message{
Status: Drop,
setID: uint64(setIdx),
PeerID: pid,
}

Expand All @@ -610,7 +631,7 @@ func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) e
// start handles all the action for the peerSet.
func (ps *PeerSet) start(aq chan action) {
ps.actionQueue = aq
ps.resultMsgCh = make(chan interface{}, msgChanSize)
ps.resultMsgCh = make(chan Message, msgChanSize)
go ps.doWork()
}

Expand Down
4 changes: 1 addition & 3 deletions dot/peerset/peerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ func TestAddReservedPeers(t *testing.T) {
if len(ps.resultMsgCh) == 0 {
break
}
m := <-ps.resultMsgCh
msg, ok := m.(Message)
require.True(t, ok)
msg := <-ps.resultMsgCh
require.Equal(t, expectedMsgs[i], msg)
}
}
Expand Down
4 changes: 2 additions & 2 deletions dot/rpc/modules/author.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ type decodedKey struct {
type ExtrinsicStatus struct {
IsFuture bool
IsReady bool
Isfinalised bool
Asfinalised common.Hash
IsFinalized bool
AsFinalized common.Hash
IsUsurped bool
AsUsurped common.Hash
IsBroadcast bool
Expand Down
3 changes: 2 additions & 1 deletion lib/babe/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ func (b *BlockBuilder) buildBlockExtrinsics(slot Slot, rt runtime.Instance) []*t
func (b *BlockBuilder) buildBlockInherents(slot Slot, rt runtime.Instance) ([][]byte, error) {
// Setup inherents: add timstap0
idata := types.NewInherentsData()
err := idata.SetInt64Inherent(types.Timstap0, uint64(time.Now().Unix()))
timestamp := uint64(time.Now().UnixMilli())
err := idata.SetInt64Inherent(types.Timstap0, timestamp)
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion lib/runtime/wasmer/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -1906,10 +1906,14 @@ func ext_storage_clear_prefix_version_2(context unsafe.Pointer, prefixSpan, lim
return C.int64_t(ret)
}

if len(limit) == 0 {
// limit is None, set limit to max
limit = []byte{0xff, 0xff, 0xff, 0xff}
}

limitUint := binary.LittleEndian.Uint32(limit)
numRemoved, all := storage.ClearPrefixLimit(prefix, limitUint)
encBytes, err := toKillStorageResultEnum(all, numRemoved)

if err != nil {
logger.Errorf("[ext_storage_clear_prefix_version_2] failed to allocate memory: %s", err)
ret, _ := toWasmMemory(instanceContext, nil)
Expand Down

0 comments on commit 09e3826

Please sign in to comment.