Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Implement keepalive routine with ping-ponging to ws connection in ws controller #6757

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
81ddee5
Added Websocket connection configurating
UlyanaAndrukhiv Nov 20, 2024
808b54b
Updated configureConnection and godoc
UlyanaAndrukhiv Nov 21, 2024
6c5ab5d
Adedd SetWriteDeadline before write operation
UlyanaAndrukhiv Nov 21, 2024
eec15e5
Set initital read deadline, updated godoc
UlyanaAndrukhiv Nov 21, 2024
fd567aa
Merge branch 'master' into UlyanaAndrukhiv/6638-ws-connection-configu…
UlyanaAndrukhiv Nov 21, 2024
098c10d
Merge branch 'master' into UlyanaAndrukhiv/6638-ws-connection-configu…
UlyanaAndrukhiv Nov 22, 2024
917bbde
Implemented ping-pong ws routine, refactored shutdownConnection
UlyanaAndrukhiv Nov 22, 2024
438b130
Merge branch 'UlyanaAndrukhiv/6638-ws-connection-configuring' of gith…
UlyanaAndrukhiv Nov 22, 2024
86cdb35
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into UlyanaAn…
UlyanaAndrukhiv Nov 25, 2024
ec4e247
Added more comments and updated godoc
UlyanaAndrukhiv Nov 25, 2024
eae6bbf
Moved constants to new websockets package according to comment
UlyanaAndrukhiv Nov 25, 2024
4e2d35c
Merge branch 'UlyanaAndrukhiv/6638-ws-connection-configuring' of gith…
UlyanaAndrukhiv Nov 25, 2024
9971188
Merge branch 'master' into UlyanaAndrukhiv/6638-ws-connection-configu…
UlyanaAndrukhiv Nov 25, 2024
6cd2841
Merged with UlyanaAndrukhiv/6638-ws-connection-configuring
UlyanaAndrukhiv Nov 25, 2024
c90d75f
Updated according to comments, added unit tests for ping-pong functio…
UlyanaAndrukhiv Nov 26, 2024
afc8648
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Nov 26, 2024
040a949
Updated WriteMessage to WriteControl for Ping messages, updated mocks…
UlyanaAndrukhiv Nov 27, 2024
357dc2f
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Nov 27, 2024
276ea7e
Added tests for keepalive, configure connection, graceful shutdown, a…
UlyanaAndrukhiv Nov 28, 2024
077c543
Merge branch 'UlyanaAndrukhiv/6639-ws-ping-pong' of github.com:The-K-…
UlyanaAndrukhiv Nov 28, 2024
21259ce
Added happy case test for keepalive
UlyanaAndrukhiv Nov 28, 2024
1f5728d
Updated unit test for keep alive
UlyanaAndrukhiv Nov 28, 2024
f384b0a
Removed sendPing abstraction, updated godoc according to comments
UlyanaAndrukhiv Dec 2, 2024
66d0607
Updated last commit
UlyanaAndrukhiv Dec 2, 2024
3cfe98b
Extended godoc
UlyanaAndrukhiv Dec 2, 2024
3285b23
Updated controller according to comments
UlyanaAndrukhiv Dec 3, 2024
d58446d
Moved ping pong constants
UlyanaAndrukhiv Dec 3, 2024
f3d0bb8
Removed unused errorChannel for controller
UlyanaAndrukhiv Dec 3, 2024
9d07387
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Dec 3, 2024
2e8f1bc
Updated controller and tests according to comments
UlyanaAndrukhiv Dec 4, 2024
08c902b
Merged with master
UlyanaAndrukhiv Dec 4, 2024
c38fcf5
Renamed method, updated tests
UlyanaAndrukhiv Dec 4, 2024
d0ccb45
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Dec 5, 2024
5aee1ca
Refactored logging the error message for controller
UlyanaAndrukhiv Dec 5, 2024
17d40b6
Updated HandleConnection to always shut down the connection
UlyanaAndrukhiv Dec 5, 2024
29fe648
Updated behaviour of controller for context canceled
UlyanaAndrukhiv Dec 5, 2024
7012c3a
Cleaned up the code
UlyanaAndrukhiv Dec 5, 2024
83f070f
Updated duration according to comment
UlyanaAndrukhiv Dec 5, 2024
8ca674f
Merge branch 'UlyanaAndrukhiv/6639-ws-ping-pong' of github.com:The-K-…
UlyanaAndrukhiv Dec 5, 2024
f07807e
Added comment for test according suggestion
UlyanaAndrukhiv Dec 5, 2024
08e5f73
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Dec 6, 2024
a044577
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Dec 16, 2024
cfad79f
Updated godoc to latest version, added suggestions to websocket contr…
UlyanaAndrukhiv Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ generate-mocks: install-mock-generators
mockery --name 'BlockTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
mockery --name 'DataProvider' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock"
mockery --name 'DataProviderFactory' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock"
mockery --name 'WebsocketConnection' --dir="./engine/access/rest/websockets" --case=underscore --output="./engine/access/rest/websockets/mock" --outpkg="mock"
mockery --name 'ExecutionDataTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
mockery --name 'ConnectionFactory' --dir="./engine/access/rpc/connection" --case=underscore --output="./engine/access/rpc/connection/mock" --outpkg="mock"
mockery --name 'Communicator' --dir="./engine/access/rpc/backend" --case=underscore --output="./engine/access/rpc/backend/mock" --outpkg="mock"
Expand Down
27 changes: 27 additions & 0 deletions engine/access/rest/websockets/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,33 @@ import (
"time"
)

const (
// PingPeriod defines the interval at which ping messages are sent to the client.
// This value must be less than pongWait, cause it that case the server ensures it sends a ping well before the PongWait
// timeout elapses. Each new pong message resets the server's read deadline, keeping the connection alive as long as
// the client is responsive.
//
// Example:
// At t=9, the server sends a ping, initial read deadline is t=10 (for the first message)
// At t=10, the client responds with a pong. The server resets its read deadline to t=20.
// At t=18, the server sends another ping. If the client responds with a pong at t=19, the read deadline is extended to t=29.
//
// In case of failure:
// If the client stops responding, the server will send a ping at t=9 but won't receive a pong by t=10. The server then closes the connection.
PingPeriod = (PongWait * 9) / 10

// PongWait specifies the maximum time to wait for a pong response message from the peer
// after sending a ping
PongWait = 10 * time.Second

// WriteWait specifies a timeout for the write operation. If the write
// isn't completed within this duration, it fails with a timeout error.
// SetWriteDeadline ensures the write operation does not block indefinitely
// if the client is slow or unresponsive. This prevents resource exhaustion
// and allows the server to gracefully handle timeouts for delayed writes.
WriteWait = 10 * time.Second
)

type Config struct {
MaxSubscriptionsPerConnection uint64
MaxResponsesPerSecond uint64
Expand Down
57 changes: 57 additions & 0 deletions engine/access/rest/websockets/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package websockets

import (
"time"

"github.com/gorilla/websocket"
)

type WebsocketConnection interface {
ReadJSON(v interface{}) error
WriteJSON(v interface{}) error
WriteControl(messageType int, deadline time.Time) error
Close() error
SetReadDeadline(deadline time.Time) error
SetWriteDeadline(deadline time.Time) error
SetPongHandler(h func(string) error)
}

type WebsocketConnectionImpl struct {
conn *websocket.Conn
}

func NewWebsocketConnection(conn *websocket.Conn) *WebsocketConnectionImpl {
return &WebsocketConnectionImpl{
conn: conn,
}
}

var _ WebsocketConnection = (*WebsocketConnectionImpl)(nil)

func (c *WebsocketConnectionImpl) ReadJSON(v interface{}) error {
return c.conn.ReadJSON(v)
}

func (c *WebsocketConnectionImpl) WriteJSON(v interface{}) error {
return c.conn.WriteJSON(v)
}

func (c *WebsocketConnectionImpl) WriteControl(messageType int, deadline time.Time) error {
return c.conn.WriteControl(messageType, nil, deadline)
}

func (c *WebsocketConnectionImpl) Close() error {
return c.conn.Close()
}

func (c *WebsocketConnectionImpl) SetReadDeadline(deadline time.Time) error {
return c.conn.SetReadDeadline(deadline)
}

func (c *WebsocketConnectionImpl) SetWriteDeadline(deadline time.Time) error {
return c.conn.SetWriteDeadline(deadline)
}

func (c *WebsocketConnectionImpl) SetPongHandler(h func(string) error) {
c.conn.SetPongHandler(h)
}
157 changes: 125 additions & 32 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,33 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"

dp "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/utils/concurrentmap"
)

type Controller struct {
logger zerolog.Logger
config Config
conn *websocket.Conn
communicationChannel chan interface{}
dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
dataProviderFactory dp.DataProviderFactory
logger zerolog.Logger
config Config
conn WebsocketConnection

communicationChannel chan interface{} // Channel for sending messages to the client.

dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
dataProviderFactory dp.DataProviderFactory
}

func NewWebSocketController(
logger zerolog.Logger,
config Config,
conn *websocket.Conn,
conn WebsocketConnection,
dataProviderFactory dp.DataProviderFactory,
) *Controller {
return &Controller{
Expand All @@ -39,62 +43,131 @@ func NewWebSocketController(
}
}

// HandleConnection manages the WebSocket connection, adding context and error handling.
// HandleConnection manages the lifecycle of a WebSocket connection,
// including setup, message processing, and graceful shutdown.
//
// Parameters:
// - ctx: The context for controlling cancellation and timeouts.
func (c *Controller) HandleConnection(ctx context.Context) {
//TODO: configure the connection with ping-pong and deadlines
defer c.shutdownConnection()

// configuring the connection with appropriate read/write deadlines and handlers.
err := c.configureKeepalive()
if err != nil {
// TODO: add error handling here
c.logger.Error().Err(err).Msg("error configuring keepalive connection")

return
}

//TODO: spin up a response limit tracker routine
go c.readMessagesFromClient(ctx)
c.writeMessagesToClient(ctx)

// for track all goroutines and error handling
g, gCtx := errgroup.WithContext(ctx)

g.Go(func() error {
return c.readMessagesFromClient(gCtx)
})

g.Go(func() error {
return c.keepalive(gCtx)
})

g.Go(func() error {
return c.writeMessagesToClient(gCtx)
})

if err = g.Wait(); err != nil {
//TODO: add error handling here
c.logger.Error().Err(err).Msg("error detected in one of the goroutines")
}
}

// configureKeepalive sets up the WebSocket connection with a read deadline
// and a handler for receiving pong messages from the client.
//
// The function does the following:
// 1. Sets an initial read deadline to ensure the server doesn't wait indefinitely
// for a pong message from the client. If no message is received within the
// specified `pongWait` duration, the connection will be closed.
// 2. Establishes a Pong handler that resets the read deadline every time a pong
// message is received from the client, allowing the server to continue waiting
// for further pong messages within the new deadline.
//
// No errors are expected during normal operation.
func (c *Controller) configureKeepalive() error {
// Set the initial read deadline for the first pong message
// The Pong handler itself only resets the read deadline after receiving a Pong.
// It doesn't set an initial deadline. The initial read deadline is crucial to prevent the server from waiting
// forever if the client doesn't send Pongs.
if err := c.conn.SetReadDeadline(time.Now().Add(PongWait)); err != nil {
return fmt.Errorf("failed to set the initial read deadline: %w", err)
}
// Establish a Pong handler which sets the handler for pong messages received from the peer.
c.conn.SetPongHandler(func(string) error {
return c.conn.SetReadDeadline(time.Now().Add(PongWait))
})

return nil
}

// writeMessagesToClient reads a messages from communication channel and passes them on to a client WebSocket connection.
// The communication channel is filled by data providers. Besides, the response limit tracker is involved in
// write message regulation
func (c *Controller) writeMessagesToClient(ctx context.Context) {
//TODO: can it run forever? maybe we should cancel the ctx in the reader routine
//
// No errors are expected during normal operation. All errors are considered benign.
func (c *Controller) writeMessagesToClient(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return
case msg := <-c.communicationChannel:
return nil
case msg, ok := <-c.communicationChannel:
if !ok {
return fmt.Errorf("communication channel closed, no error occurred")
}
// TODO: handle 'response per second' limits

// Specifies a timeout for the write operation. If the write
// isn't completed within this duration, it fails with a timeout error.
// SetWriteDeadline ensures the write operation does not block indefinitely
// if the client is slow or unresponsive. This prevents resource exhaustion
// and allows the server to gracefully handle timeouts for delayed writes.
if err := c.conn.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
return fmt.Errorf("failed to set the write deadline: %w", err)
}
err := c.conn.WriteJSON(msg)
if err != nil {
c.logger.Error().Err(err).Msg("error writing to connection")
return fmt.Errorf("failed to write message to connection: %w", err)
}
}
}
}

// readMessagesFromClient continuously reads messages from a client WebSocket connection,
// processes each message, and handles actions based on the message type.
func (c *Controller) readMessagesFromClient(ctx context.Context) {
defer c.shutdownConnection()

//
// No errors are expected during normal operation. All errors are considered benign.
func (c *Controller) readMessagesFromClient(ctx context.Context) error {
for {
select {
case <-ctx.Done():
c.logger.Info().Msg("context canceled, stopping read message loop")
return
return nil
default:
msg, err := c.readMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) {
return
return nil
}
c.logger.Warn().Err(err).Msg("error reading message from client")
return
return fmt.Errorf("failed to read message from client: %w", err)
}

baseMsg, validatedMsg, err := c.parseAndValidateMessage(msg)
_, validatedMsg, err := c.parseAndValidateMessage(msg)
if err != nil {
c.logger.Debug().Err(err).Msg("error parsing and validating client message")
return
return fmt.Errorf("failed to parse and validate client message: %w", err)
}

if err := c.handleAction(ctx, validatedMsg); err != nil {
c.logger.Warn().Err(err).Str("action", baseMsg.Action).Msg("error handling action")
return fmt.Errorf("failed to handle message action: %w", err)
}
}
}
Expand Down Expand Up @@ -139,7 +212,6 @@ func (c *Controller) parseAndValidateMessage(message json.RawMessage) (models.Ba
validatedMsg = listMsg

default:
c.logger.Debug().Str("action", baseMsg.Action).Msg("unknown action type")
return baseMsg, nil, fmt.Errorf("unknown action type: %s", baseMsg.Action)
}

Expand Down Expand Up @@ -202,12 +274,12 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.Lis
}

func (c *Controller) shutdownConnection() {
defer close(c.communicationChannel)
defer func(conn *websocket.Conn) {
defer func() {
if err := c.conn.Close(); err != nil {
c.logger.Error().Err(err).Msg("error closing connection")
}
}(c.conn)
// TODO: safe closing communicationChannel will be included as a part of PR #6642
}()

err := c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error {
dp.Close()
Expand All @@ -219,3 +291,24 @@ func (c *Controller) shutdownConnection() {

c.dataProviders.Clear()
}

// keepalive sends a ping message periodically to keep the WebSocket connection alive
// and avoid timeouts.
//
// No errors are expected during normal operation. All errors are considered benign.
func (c *Controller) keepalive(ctx context.Context) error {
pingTicker := time.NewTicker(PingPeriod)
defer pingTicker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-pingTicker.C:
err := c.conn.WriteControl(websocket.PingMessage, time.Now().Add(WriteWait))
if err != nil {
return fmt.Errorf("failed to write ping message: %w", err)
}
}
}
}
Loading
Loading