Skip to content

Commit

Permalink
use ring of connections instead of slice, use multi relays in tfplugi…
Browse files Browse the repository at this point in the history
…n, add atomic bool to watch busy connections
  • Loading branch information
rawdaGastan committed May 1, 2024
1 parent 3b0f53b commit bbd7e29
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 31 deletions.
24 changes: 17 additions & 7 deletions grid-client/deployer/tf_plugin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,22 @@ var (
"qa": "https://graphql.qa.grid.tf/graphql",
"main": "https://graphql.grid.tf/graphql",
}
// RelayURLS relay urls
RelayURLS = map[string][]string{
"dev": {"wss://relay.dev.grid.tf", "wss://relay.02.dev.grid.tf"},
"test": {"wss://relay.test.grid.tf", "wss://relay.02.test.grid.tf"},
"qa": {"wss://relay.qa.grid.tf"},
"main": {"wss://relay.grid.tf"},
// RelayURLs relay urls
RelayURLs = map[string][]string{
"dev": {
"wss://relay.dev.grid.tf",
"wss://relay.02.dev.grid.tf",
},
"test": {
"wss://relay.test.grid.tf",
"wss://relay.02.test.grid.tf",
},
"qa": {
"wss://relay.qa.grid.tf",
},
"main": {
"wss://relay.grid.tf",
},
}
)

Expand Down Expand Up @@ -180,7 +190,7 @@ func parsePluginOpts(opts ...PluginOpt) (pluginCfg, error) {
}

if len(cfg.relayURLs) == 0 {
cfg.relayURLs = RelayURLS[cfg.network]
cfg.relayURLs = RelayURLs[cfg.network]
}
for _, url := range cfg.relayURLs {
if err := validateWssURL(url); err != nil {
Expand Down
16 changes: 7 additions & 9 deletions rmb-sdk-go/peer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/http"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
Expand All @@ -24,7 +25,7 @@ type InnerConnection struct {
session string
identity substrate.Identity
url string
busy bool
busy *atomic.Bool
writer chan send
}

Expand Down Expand Up @@ -55,6 +56,7 @@ func NewConnection(identity substrate.Identity, url string, session string, twin
url: url,
session: session,
writer: make(chan send),
busy: &atomic.Bool{},
}
}

Expand Down Expand Up @@ -82,7 +84,7 @@ func (c *InnerConnection) reader(ctx context.Context, cancel context.CancelFunc,
}

func (c *InnerConnection) send(ctx context.Context, data []byte) error {
if c.busy {
if c.busy.Load() {
return fmt.Errorf("connection is busy")
}

Expand Down Expand Up @@ -133,12 +135,6 @@ func (c *InnerConnection) loop(ctx context.Context, con *websocket.Conn, output
output <- data
lastPong = time.Now()
case sent := <-c.writer:
// should we remove this flag?
c.busy = true
defer func() {
c.busy = false
}()

if err := con.WriteMessage(websocket.BinaryMessage, sent.data); err != nil {
select {
case sent.err <- err:
Expand All @@ -147,9 +143,9 @@ func (c *InnerConnection) loop(ctx context.Context, con *websocket.Conn, output
return ctx.Err()
}
}

select {
case sent.err <- nil:
c.busy = false
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -187,10 +183,12 @@ func (c *InnerConnection) Start(ctx context.Context, output chan []byte) {

// listenAndServe creates the websocket connection, and if successful, listens for and serves incoming and outgoing messages.
func (c *InnerConnection) listenAndServe(ctx context.Context, output chan []byte) error {
c.busy.Store(true)
con, err := c.connect()
if err != nil {
return errors.Wrap(err, "failed to reconnect")
}
c.busy.Store(false)

return c.loop(ctx, con, output)
}
Expand Down
2 changes: 1 addition & 1 deletion rmb-sdk-go/peer/examples/peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func app() error {
return nil
}

func relayCallback(ctx context.Context, p peer.Peer, response *types.Envelope, callBackErr error) {
func relayCallback(ctx context.Context, p *peer.Peer, response *types.Envelope, callBackErr error) {
output, err := peer.Json(response, callBackErr)
if err != nil {
log.Error().Err(err).Send()
Expand Down
2 changes: 1 addition & 1 deletion rmb-sdk-go/peer/examples/peer_pingmany/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
wg.Add(count)

received := 0
handler := func(ctx context.Context, peer peer.Peer, env *types.Envelope, err error) {
handler := func(ctx context.Context, peer *peer.Peer, env *types.Envelope, err error) {
received += 1
log.Info().Int("received", received).Msg("received responses so far")
defer wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion rmb-sdk-go/peer/examples/rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func app() error {
// NOTE: we calling the service 'calculator' session
// as per the router_server example
service := "calculator"
const dst = 7 // <- replace this with the twin id of where the service is running
const dst = 81 // <- replace this with the twin id of where the service is running
// it's okay to run both the server and the client behind the same rmb-peer
var output float64
for i := 0; i < 20; i++ {
Expand Down
27 changes: 17 additions & 10 deletions rmb-sdk-go/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peer

import (
"bytes"
"container/ring"
"context"
"crypto/aes"
"crypto/cipher"
Expand All @@ -13,6 +14,7 @@ import (
"slices"
"sort"
"strings"
"sync"
"time"

"github.com/decred/dcrd/dcrec/secp256k1/v4"
Expand All @@ -33,7 +35,7 @@ const (

// Handler is a call back that is called with verified and decrypted incoming
// messages. An error can be non-nil error if verification or decryption failed
type Handler func(ctx context.Context, peer Peer, env *types.Envelope, err error)
type Handler func(ctx context.Context, peer *Peer, env *types.Envelope, err error)

type cacheFactory = func(inner TwinDB, chainURL string) (TwinDB, error)

Expand Down Expand Up @@ -104,7 +106,8 @@ type Peer struct {
twinDB TwinDB
privKey *secp256k1.PrivateKey
reader Reader
cons []InnerConnection
cons *ring.Ring
mu sync.Mutex
handler Handler
encoder encoder.Encoder
}
Expand Down Expand Up @@ -241,11 +244,12 @@ func NewPeer(

reader := make(chan []byte)

var cons []InnerConnection
cons := ring.New(len(cfg.relayURLs))
for _, url := range cfg.relayURLs {
conn := NewConnection(identity, url, cfg.session, twin.ID)
conn.Start(ctx, reader)
cons = append(cons, conn)
cons.Value = conn
cons = cons.Next()
}

var sessionP *string
Expand Down Expand Up @@ -278,7 +282,7 @@ func (p *Peer) Encoder() encoder.Encoder {
return p.encoder
}

func (d Peer) handleIncoming(incoming *types.Envelope) error {
func (d *Peer) handleIncoming(incoming *types.Envelope) error {
errResp := incoming.GetError()
if incoming.Source == nil {
// an envelope received that has NO source twin
Expand Down Expand Up @@ -338,7 +342,7 @@ func (d *Peer) process(ctx context.Context) {
}
// verify and decoding!
err := d.handleIncoming(&env)
d.handler(ctx, *d, &env, err)
d.handler(ctx, d, &env, err)
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -489,20 +493,23 @@ func (d *Peer) send(ctx context.Context, request *types.Envelope) error {
}

var errs error
for _, con := range d.cons {

for i := 0; i < d.cons.Len(); i++ {
con := d.cons.Value.(InnerConnection)
err := con.send(ctx, bytes)
if err != nil {
errs = multierror.Append(errs, err)
continue
}

// move successful connection to first pos
// d.cons = append([]InnerConnection{con}, append(d.cons[:i], d.cons[i+1:]...)...)
d.mu.Lock()
d.cons = d.cons.Next()
d.mu.Unlock()

return nil
}

return errs

}

// SendRequest sends an rmb message to the relay
Expand Down
2 changes: 1 addition & 1 deletion rmb-sdk-go/peer/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *Router) Use(mw Middleware) {
r.mw = append(r.mw, mw)
}

func (r *Router) Serve(ctx context.Context, peer Peer, env *types.Envelope, err error) {
func (r *Router) Serve(ctx context.Context, peer *Peer, env *types.Envelope, err error) {
if err != nil {
log.Error().Err(err).Msg("bad request")
return
Expand Down
2 changes: 1 addition & 1 deletion rmb-sdk-go/peer/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewRpcClient(
return &rpc, nil
}

func (d *RpcClient) router(ctx context.Context, peer Peer, env *types.Envelope, err error) {
func (d *RpcClient) router(ctx context.Context, peer *Peer, env *types.Envelope, err error) {
d.m.RLock()
defer d.m.RUnlock()

Expand Down

0 comments on commit bbd7e29

Please sign in to comment.