Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Implement keepalive routine with ping-ponging to ws connection in ws controller #6757

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
81ddee5
Added Websocket connection configurating
UlyanaAndrukhiv Nov 20, 2024
808b54b
Updated configureConnection and godoc
UlyanaAndrukhiv Nov 21, 2024
6c5ab5d
Adedd SetWriteDeadline before write operation
UlyanaAndrukhiv Nov 21, 2024
eec15e5
Set initital read deadline, updated godoc
UlyanaAndrukhiv Nov 21, 2024
fd567aa
Merge branch 'master' into UlyanaAndrukhiv/6638-ws-connection-configu…
UlyanaAndrukhiv Nov 21, 2024
098c10d
Merge branch 'master' into UlyanaAndrukhiv/6638-ws-connection-configu…
UlyanaAndrukhiv Nov 22, 2024
917bbde
Implemented ping-pong ws routine, refactored shutdownConnection
UlyanaAndrukhiv Nov 22, 2024
438b130
Merge branch 'UlyanaAndrukhiv/6638-ws-connection-configuring' of gith…
UlyanaAndrukhiv Nov 22, 2024
86cdb35
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into UlyanaAn…
UlyanaAndrukhiv Nov 25, 2024
ec4e247
Added more comments and updated godoc
UlyanaAndrukhiv Nov 25, 2024
eae6bbf
Moved constants to new websockets package according to comment
UlyanaAndrukhiv Nov 25, 2024
4e2d35c
Merge branch 'UlyanaAndrukhiv/6638-ws-connection-configuring' of gith…
UlyanaAndrukhiv Nov 25, 2024
9971188
Merge branch 'master' into UlyanaAndrukhiv/6638-ws-connection-configu…
UlyanaAndrukhiv Nov 25, 2024
6cd2841
Merged with UlyanaAndrukhiv/6638-ws-connection-configuring
UlyanaAndrukhiv Nov 25, 2024
c90d75f
Updated according to comments, added unit tests for ping-pong functio…
UlyanaAndrukhiv Nov 26, 2024
afc8648
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Nov 26, 2024
040a949
Updated WriteMessage to WriteControl for Ping messages, updated mocks…
UlyanaAndrukhiv Nov 27, 2024
357dc2f
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Nov 27, 2024
276ea7e
Added tests for keepalive, configure connection, graceful shutdown, a…
UlyanaAndrukhiv Nov 28, 2024
077c543
Merge branch 'UlyanaAndrukhiv/6639-ws-ping-pong' of github.com:The-K-…
UlyanaAndrukhiv Nov 28, 2024
21259ce
Added happy case test for keepalive
UlyanaAndrukhiv Nov 28, 2024
1f5728d
Updated unit test for keep alive
UlyanaAndrukhiv Nov 28, 2024
f384b0a
Removed sendPing abstraction, updated godoc according to comments
UlyanaAndrukhiv Dec 2, 2024
66d0607
Updated last commit
UlyanaAndrukhiv Dec 2, 2024
3cfe98b
Extended godoc
UlyanaAndrukhiv Dec 2, 2024
3285b23
Updated controller according to comments
UlyanaAndrukhiv Dec 3, 2024
d58446d
Moved ping pong constants
UlyanaAndrukhiv Dec 3, 2024
f3d0bb8
Removed unused errorChannel for controller
UlyanaAndrukhiv Dec 3, 2024
9d07387
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Dec 3, 2024
2e8f1bc
Updated controller and tests according to comments
UlyanaAndrukhiv Dec 4, 2024
08c902b
Merged with master
UlyanaAndrukhiv Dec 4, 2024
c38fcf5
Renamed method, updated tests
UlyanaAndrukhiv Dec 4, 2024
d0ccb45
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Dec 5, 2024
5aee1ca
Refactored logging the error message for controller
UlyanaAndrukhiv Dec 5, 2024
17d40b6
Updated HandleConnection to always shut down the connection
UlyanaAndrukhiv Dec 5, 2024
29fe648
Updated behaviour of controller for context canceled
UlyanaAndrukhiv Dec 5, 2024
7012c3a
Cleaned up the code
UlyanaAndrukhiv Dec 5, 2024
83f070f
Updated duration according to comment
UlyanaAndrukhiv Dec 5, 2024
8ca674f
Merge branch 'UlyanaAndrukhiv/6639-ws-ping-pong' of github.com:The-K-…
UlyanaAndrukhiv Dec 5, 2024
f07807e
Added comment for test according suggestion
UlyanaAndrukhiv Dec 5, 2024
08e5f73
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Dec 6, 2024
a044577
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Dec 16, 2024
cfad79f
Updated godoc to latest version, added suggestions to websocket contr…
UlyanaAndrukhiv Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 189 additions & 29 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/google/uuid"
"github.com/gorilla/websocket"
Expand All @@ -16,13 +18,31 @@ import (
"github.com/onflow/flow-go/utils/concurrentmap"
)

const (
// PingPeriod defines the interval at which ping messages are sent to the client.
// This value must be less than pongWait.
PingPeriod = (pongWait * 9) / 10

// Time allowed to read the next pong message from the peer.
pongWait = 10 * time.Second

// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
)

type Controller struct {
logger zerolog.Logger
config Config
conn *websocket.Conn
communicationChannel chan interface{}
logger zerolog.Logger
config Config
conn *websocket.Conn

communicationChannel chan interface{} // Channel for sending messages to the client.
errorChannel chan error // Channel for reporting errors.

dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
dataProvidersFactory *dp.Factory

shutdownOnce sync.Once // Ensures shutdown is only called once
shutdown bool // Indicates if the controller is shutting down.
}

func NewWebSocketController(
Expand All @@ -37,67 +57,166 @@ func NewWebSocketController(
config: config,
conn: conn,
communicationChannel: make(chan interface{}), //TODO: should it be buffered chan?
errorChannel: make(chan error, 1), // Buffered error channel to hold one error.
dataProviders: concurrentmap.New[uuid.UUID, dp.DataProvider](),
dataProvidersFactory: dp.NewDataProviderFactory(logger, streamApi, streamConfig),
}
}

// HandleConnection manages the WebSocket connection, adding context and error handling.
// HandleConnection manages the lifecycle of a WebSocket connection,
// including setup, message processing, and graceful shutdown.
//
// Parameters:
// - ctx: The context for controlling cancellation and timeouts.
func (c *Controller) HandleConnection(ctx context.Context) {
//TODO: configure the connection with ping-pong and deadlines
defer close(c.errorChannel)
// configuring the connection with appropriate read/write deadlines and handlers.
err := c.configureConnection()
if err != nil {
// TODO: add error handling here
c.logger.Error().Err(err).Msg("error configuring connection")
c.shutdownConnection()
return
}

//TODO: spin up a response limit tracker routine
go c.readMessagesFromClient(ctx)
c.writeMessagesToClient(ctx)

// for track all goroutines and error handling
var wg sync.WaitGroup

c.startProcess(&wg, ctx, c.readMessagesFromClient)
c.startProcess(&wg, ctx, c.keepalive)
c.startProcess(&wg, ctx, c.writeMessagesToClient)

// Wait for context cancellation or errors from goroutines.
select {
case err := <-c.errorChannel:
c.logger.Error().Err(err).Msg("error detected in one of the goroutines")
//TODO: add error handling here
c.shutdownConnection()
case <-ctx.Done():
// Context canceled, shut down gracefully
c.shutdownConnection()
}

// Ensure all goroutines finish execution.
wg.Wait()
}

// startProcess is a helper function to start a goroutine for a given process
// and ensure it is tracked via a sync.WaitGroup.
//
// Parameters:
// - wg: The wait group to track goroutines.
// - ctx: The context for cancellation.
// - process: The function to run in a new goroutine.
//
// No errors are expected during normal operation.
func (c *Controller) startProcess(wg *sync.WaitGroup, ctx context.Context, process func(context.Context) error) {
wg.Add(1)

go func() {
defer wg.Done()

err := process(ctx)
if err != nil {
// Check if shutdown has already been called, to avoid multiple shutdowns
if c.shutdown {
c.logger.Warn().Err(err).Msg("error detected after shutdown initiated, ignoring")
return
}

c.errorChannel <- err
}
}()
}

// configureConnection sets up the WebSocket connection with a read deadline
// and a handler for receiving pong messages from the client.
//
// The function does the following:
// 1. Sets an initial read deadline to ensure the server doesn't wait indefinitely
// for a pong message from the client. If no message is received within the
// specified `pongWait` duration, the connection will be closed.
// 2. Establishes a Pong handler that resets the read deadline every time a pong
// message is received from the client, allowing the server to continue waiting
// for further pong messages within the new deadline.
func (c *Controller) configureConnection() error {
// Set the initial read deadline for the first pong message
// The Pong handler itself only resets the read deadline after receiving a Pong.
// It doesn't set an initial deadline. The initial read deadline is crucial to prevent the server from waiting
// forever if the client doesn't send Pongs.
if err := c.conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
return fmt.Errorf("failed to set the initial read deadline: %w", err)
}
// Establish a Pong handler which sets the handler for pong messages received from the peer.
c.conn.SetPongHandler(func(string) error {
return c.conn.SetReadDeadline(time.Now().Add(pongWait))
})

return nil
}

// writeMessagesToClient reads a messages from communication channel and passes them on to a client WebSocket connection.
// The communication channel is filled by data providers. Besides, the response limit tracker is involved in
// write message regulation
func (c *Controller) writeMessagesToClient(ctx context.Context) {
//TODO: can it run forever? maybe we should cancel the ctx in the reader routine
//
// No errors are expected during normal operation.
func (c *Controller) writeMessagesToClient(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return
return nil
case msg := <-c.communicationChannel:
// TODO: handle 'response per second' limits

// Specifies a timeout for the write operation. If the write
// isn't completed within this duration, it fails with a timeout error.
// SetWriteDeadline ensures the write operation does not block indefinitely
// if the client is slow or unresponsive. This prevents resource exhaustion
// and allows the server to gracefully handle timeouts for delayed writes.
if err := c.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
c.logger.Error().Err(err).Msg("failed to set the write deadline")
return err
}
err := c.conn.WriteJSON(msg)
if err != nil {
c.logger.Error().Err(err).Msg("error writing to connection")
return err
}
}
}
}

// readMessagesFromClient continuously reads messages from a client WebSocket connection,
// processes each message, and handles actions based on the message type.
func (c *Controller) readMessagesFromClient(ctx context.Context) {
defer c.shutdownConnection()

//
// No errors are expected during normal operation.
func (c *Controller) readMessagesFromClient(ctx context.Context) error {
for {
select {
case <-ctx.Done():
c.logger.Info().Msg("context canceled, stopping read message loop")
return
return nil
default:
msg, err := c.readMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) {
return
return nil
}
c.logger.Warn().Err(err).Msg("error reading message from client")
return
return err
}

baseMsg, validatedMsg, err := c.parseAndValidateMessage(msg)
if err != nil {
c.logger.Debug().Err(err).Msg("error parsing and validating client message")
return
return err
}

if err := c.handleAction(ctx, validatedMsg); err != nil {
c.logger.Warn().Err(err).Str("action", baseMsg.Action).Msg("error handling action")
return err
}
}
}
Expand Down Expand Up @@ -193,20 +312,61 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.Lis
}

func (c *Controller) shutdownConnection() {
defer close(c.communicationChannel)
defer func(conn *websocket.Conn) {
if err := c.conn.Close(); err != nil {
c.logger.Error().Err(err).Msg("error closing connection")
c.shutdownOnce.Do(func() {
c.shutdown = true

defer close(c.communicationChannel)
defer func(conn *websocket.Conn) {
if err := c.conn.Close(); err != nil {
c.logger.Error().Err(err).Msg("error closing connection")
}
}(c.conn)

err := c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error {
dp.Close()
return nil
})
if err != nil {
c.logger.Error().Err(err).Msg("error closing data provider")
}
}(c.conn)

err := c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error {
dp.Close()
return nil
c.dataProviders.Clear()
})
if err != nil {
c.logger.Error().Err(err).Msg("error closing data provider")
}

// keepalive sends a ping message periodically to keep the WebSocket connection alive
// and avoid timeouts.
//
// No errors are expected during normal operation.
func (c *Controller) keepalive(ctx context.Context) error {
pingTicker := time.NewTicker(PingPeriod)
defer pingTicker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-pingTicker.C:
if err := c.sendPing(); err != nil {
// Log error and exit the loop on failure
c.logger.Error().Err(err).Msg("failed to send ping")
return err
}
}
}
}

// sendPing sends a periodic ping message to the WebSocket client to keep the connection alive.
//
// No errors are expected during normal operation.
func (c *Controller) sendPing() error {
if err := c.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
return fmt.Errorf("failed to set the write deadline for ping: %w", err)
}

c.dataProviders.Clear()
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return fmt.Errorf("failed to write ping message: %w", err)
}

return nil
}
6 changes: 2 additions & 4 deletions engine/access/rest/websockets/legacy/websocket_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/atomic"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/websockets"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
Expand All @@ -23,9 +24,6 @@ const (
// Time allowed to read the next pong message from the peer.
pongWait = 10 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10

// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
)
Expand Down Expand Up @@ -111,7 +109,7 @@ func (wsController *WebsocketController) wsErrorHandler(err error) {
// If an error occurs or the subscription channel is closed, it handles the error or termination accordingly.
// The function uses a ticker to periodically send ping messages to the client to maintain the connection.
func (wsController *WebsocketController) writeEvents(sub subscription.Subscription) {
ticker := time.NewTicker(pingPeriod)
ticker := time.NewTicker(websockets.PingPeriod)
defer ticker.Stop()

blocksSinceLastMessage := uint64(0)
Expand Down
Loading