Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Network test sync logic #2748

Merged
merged 19 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ipfs/boxo/blockservice"
exchange "github.com/ipfs/boxo/exchange"
"github.com/ipfs/boxo/ipns"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
libp2p "github.com/libp2p/go-libp2p"
Expand Down Expand Up @@ -446,6 +447,13 @@ func (p *Peer) handleDocUpdateLog(evt event.Update) error {
}

func (p *Peer) pushLogToReplicators(lg event.Update) {
// let the exchange know we have this block
// this should speed up the dag sync process
err := p.bserv.Exchange().NotifyNewBlocks(p.ctx, blocks.NewBlock(lg.Block))
if err != nil {
log.ErrorContextE(p.ctx, "Failed to notify new blocks", err)
}

// push to each peer (replicator)
peers := make(map[string]struct{})
for _, peer := range p.ps.ListPeers(lg.DocID) {
Expand Down Expand Up @@ -504,6 +512,11 @@ func stopGRPCServer(ctx context.Context, server *grpc.Server) {
}
}

// Connect initiates a connection to the peer with the given address.
func (p *Peer) Connect(ctx context.Context, addr peer.AddrInfo) error {
return p.host.Connect(ctx, addr)
}

// Bootstrap connects to the given peers.
func (p *Peer) Bootstrap(addrs []peer.AddrInfo) {
var connected uint64
Expand Down
75 changes: 57 additions & 18 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,24 @@
if err != nil {
return nil, err
}

log.InfoContext(ctx, "Received pushlog",
corelog.Any("PeerID", pid.String()),
corelog.Any("Creator", byPeer.String()),
corelog.Any("DocID", docID.String()))

log.InfoContext(ctx, "Starting DAG sync",
corelog.Any("PeerID", pid.String()),
corelog.Any("DocID", docID.String()))

err = syncDAG(ctx, s.peer.bserv, block)
if err != nil {
return nil, err
}

s.peer.bus.Publish(event.NewMessage(event.MergeName, event.Merge{
DocID: docID.String(),
ByPeer: byPeer,
FromPeer: pid,
Cid: headCID,
SchemaRoot: string(req.Body.SchemaRoot),
}))
log.InfoContext(ctx, "DAG sync complete",
corelog.Any("PeerID", pid.String()),
corelog.Any("DocID", docID.String()))

// Once processed, subscribe to the DocID topic on the pubsub network unless we already
// suscribe to the collection.
Expand All @@ -146,6 +152,15 @@
return nil, err
}
}

s.peer.bus.Publish(event.NewMessage(event.MergeName, event.Merge{
DocID: docID.String(),
ByPeer: byPeer,
FromPeer: pid,
Cid: headCID,
SchemaRoot: string(req.Body.SchemaRoot),
}))

return &pb.PushLogReply{}, nil
}

Expand All @@ -163,6 +178,10 @@
return nil
}

log.InfoContext(s.peer.ctx, "Adding pubsub topic",
corelog.String("PeerID", s.peer.PeerID().String()),
corelog.String("Topic", topic))

s.mu.Lock()
defer s.mu.Unlock()
if t, ok := s.topics[topic]; ok {
Expand Down Expand Up @@ -205,6 +224,10 @@
return nil
}

log.InfoContext(s.peer.ctx, "Removing pubsub topic",
corelog.String("PeerID", s.peer.PeerID().String()),
corelog.String("Topic", topic))

s.mu.Lock()
defer s.mu.Unlock()
if t, ok := s.topics[topic]; ok {
Expand All @@ -218,6 +241,10 @@
if s.peer.ps == nil {
return nil
}

log.InfoContext(s.peer.ctx, "Removing all pubsub topics",
corelog.String("PeerID", s.peer.PeerID().String()))

s.mu.Lock()
defer s.mu.Unlock()
for id, t := range s.topics {
Expand All @@ -232,6 +259,10 @@
// publishLog publishes the given PushLogRequest object on the PubSub network via the
// corresponding topic
func (s *server) publishLog(ctx context.Context, topic string, req *pb.PushLogRequest) error {
log.InfoContext(ctx, "Publish log",
corelog.String("PeerID", s.peer.PeerID().String()),
corelog.String("Topic", topic))

if s.peer.ps == nil { // skip if we aren't running with a pubsub net
return nil
}
Expand Down Expand Up @@ -259,12 +290,16 @@

// pubSubMessageHandler handles incoming PushLog messages from the pubsub network.
func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte) ([]byte, error) {
log.InfoContext(s.peer.ctx, "Received new pubsub event",
corelog.String("PeerID", s.peer.PeerID().String()),
corelog.Any("SenderId", from),
corelog.String("Topic", topic))

req := new(pb.PushLogRequest)
if err := proto.Unmarshal(msg, req); err != nil {
log.ErrorContextE(s.peer.ctx, "Failed to unmarshal pubsub message %s", err)
return nil, err
}

ctx := grpcpeer.NewContext(s.peer.ctx, &grpcpeer.Peer{
Addr: addr{from},
})
Expand All @@ -276,9 +311,8 @@

// pubSubEventHandler logs events from the subscribed DocID topics.
func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) {
log.InfoContext(
s.peer.ctx,
"Received new pubsub event",
log.InfoContext(s.peer.ctx, "Received new pubsub event",
corelog.String("PeerID", s.peer.PeerID().String()),
corelog.Any("SenderId", from),
corelog.String("Topic", topic),
corelog.String("Message", string(msg)),
Expand Down Expand Up @@ -329,7 +363,18 @@
}

func (s *server) updateReplicators(evt event.Replicator) {
isDeleteRep := len(evt.Schemas) == 0
if len(evt.Schemas) == 0 {
// remove peer from store
s.peer.host.Peerstore().ClearAddrs(evt.Info.ID)
} else {
// add peer to store
s.peer.host.Peerstore().AddAddrs(evt.Info.ID, evt.Info.Addrs, peerstore.PermanentAddrTTL)
// connect to the peer
if err := s.peer.Connect(s.peer.ctx, evt.Info); err != nil {
log.ErrorContextE(s.peer.ctx, "Failed to connect to replicator peer", err)

Check warning on line 374 in net/server.go

View check run for this annotation

Codecov / codecov/patch

net/server.go#L374

Added line #L374 was not covered by tests
}
}

// update the cached replicators
s.mu.Lock()
for schema, peers := range s.replicators {
Expand All @@ -350,12 +395,6 @@
}
s.mu.Unlock()

if isDeleteRep {
s.peer.host.Peerstore().ClearAddrs(evt.Info.ID)
} else {
s.peer.host.Peerstore().AddAddrs(evt.Info.ID, evt.Info.Addrs, peerstore.PermanentAddrTTL)
}

if evt.Docs != nil {
for update := range evt.Docs {
if err := s.pushLog(s.peer.ctx, update, evt.Info.ID); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/acp/register_and_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,8 @@ func TestACP_CreateWithIdentityAndUpdateWithoutIdentityGQL_CanNotUpdate(t *testi
"name": "Shahzad Lone"
}
`,

SkipUpdateEvent: true,
},

testUtils.Request{
Expand Down Expand Up @@ -764,6 +766,8 @@ func TestACP_CreateWithIdentityAndUpdateWithWrongIdentityGQL_CanNotUpdate(t *tes
"name": "Shahzad Lone"
}
`,

SkipUpdateEvent: true,
},

testUtils.Request{
Expand Down
Loading
Loading