Skip to content

Commit

Permalink
Created websocket server (#104)
Browse files Browse the repository at this point in the history
* updated ui

* added websocket server for ui connections
  • Loading branch information
dvovk authored Feb 13, 2025
1 parent d5583b4 commit b6c3833
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 8 deletions.
1 change: 1 addition & 0 deletions api/api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func NewAPIHandler(
r.Get("/sessions/{sessionId}", r.GetSession)

// Erigon Node data
r.Get("/v2/sessions/{sessionId}/nodes/{nodeId}/ws", r.HandleWebSocket)
r.Get("/sessions/{sessionId}/nodes/{nodeId}/logs/{file}", r.Log)
r.Get("/sessions/{sessionId}/nodes/{nodeId}/dbs/*", r.Tables)
r.Get("/sessions/{sessionId}/nodes/{nodeId}/reorgs", r.ReOrg)
Expand Down
3 changes: 0 additions & 3 deletions api/bridge_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const (
var wsBufferPool = new(sync.Pool)

func (h BridgeHandler) Bridge(w http.ResponseWriter, r *http.Request) {

//Sends a success Message to the Node client, to receive more information
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
Expand Down Expand Up @@ -131,8 +130,6 @@ func (h BridgeHandler) Bridge(w http.ResponseWriter, r *http.Request) {
continue
}

//fmt.Printf("Sending request %s\n", string(bytes))

requestMutex.Lock()
requestMap[rpcRequest.Id] = request
requestMutex.Unlock()
Expand Down
1 change: 1 addition & 0 deletions api/internal/end_points.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package internal
const (
HealthCheckEndPoint = "/healthcheck"
BridgeEndPoint = "/bridge"
WSEndPoint = "/ws"
)
191 changes: 191 additions & 0 deletions api/websocket_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package api

import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"

"github.com/gorilla/websocket"
)

const (
ActionSubscribe = "subscribe"
ActionUnsubscribe = "unsubscribe"
)

// SubscriptionResponse is the response sent back to the client after an action is processed.
type ClientResponse struct {
Status string `json:"status"`
Message string `json:"message,omitempty"`
Data *string `json:"data,omitempty"`
}

type WebsocketHandler struct {
mu sync.Mutex
writeQueue chan []byte
conn *websocket.Conn
closeChan chan struct{}
}

// **NewWebsocketHandler initializes WebsocketHandler**
func NewWebsocketHandler(conn *websocket.Conn) *WebsocketHandler {
handler := &WebsocketHandler{
writeQueue: make(chan []byte, 100),
conn: conn,
closeChan: make(chan struct{}),
}

go handler.startWriter() // Start dedicated writer goroutine
return handler
}

// **Sends response safely**
func (h *WebsocketHandler) sendResponse(response *ClientResponse) {
resp, err := json.Marshal(response)
if err != nil {
fmt.Printf("Error marshaling response: %v\n", err)
return
}

select {
case h.writeQueue <- resp:
default:
fmt.Println("Warning: writeQueue is full, dropping message")
}
}

// **Dedicated writer goroutine**
func (h *WebsocketHandler) startWriter() {
for {
select {
case msg := <-h.writeQueue:
h.mu.Lock()
err := h.conn.WriteMessage(websocket.TextMessage, msg)
h.mu.Unlock()

if err != nil {
fmt.Printf("Error writing response: %v\n", err)
return
}

case <-h.closeChan:
fmt.Println("Writer goroutine stopped")
return
}
}
}

// **Close WebSocket connection and stop writer**
func (h *WebsocketHandler) closeConnection() {
close(h.closeChan)
h.conn.Close()
}

// **WebSocket handler function**
func (h *APIHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
type wsMessage struct {
Service string `json:"service"`
Action string `json:"action"`
}

upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Println("WebSocket upgrade failed:", err)
return
}
defer conn.Close()

handler := NewWebsocketHandler(conn)
defer handler.closeConnection()

channel := make(chan []byte)

// **Goroutine to forward messages from the channel to the client**
go func() {
for {
select {
case <-r.Context().Done():
return
case <-handler.closeChan: // Graceful shutdown
return
case message := <-channel:
handler.sendResponse(&ClientResponse{
Status: "success",
Message: string(message),
})
}
}
}()

// **Enable Ping/Pong Handling**
conn.SetPongHandler(func(appData string) error {
return nil
})

go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-handler.closeChan:
return
case <-ticker.C:
handler.mu.Lock()
err := conn.WriteMessage(websocket.PingMessage, nil)
handler.mu.Unlock()

if err != nil {
fmt.Println("Ping failed, closing connection:", err)
handler.closeConnection()
return
}
}
}
}()

for {
_, msg, err := conn.ReadMessage()
if err != nil {
fmt.Println("Error reading message:", err)
break
}
fmt.Printf("Received: %s\n", msg)

client, err := h.findNodeClient(r)
if err != nil {
handler.sendResponse(&ClientResponse{
Status: "error",
Message: "Client not found: " + err.Error(),
})
return
}

var inMsg wsMessage
if err := json.Unmarshal(msg, &inMsg); err != nil {
handler.sendResponse(&ClientResponse{
Status: "error",
Message: "Invalid JSON: " + err.Error(),
})
continue
}

switch inMsg.Action {
case ActionSubscribe:
go client.Subscribe(r.Context(), channel, inMsg.Service)
case ActionUnsubscribe:
client.Unsubscribe(r.Context(), channel, inMsg.Service)
default:
handler.sendResponse(&ClientResponse{
Status: "error",
Message: "Unknown action " + inMsg.Action,
})
}
}
}
14 changes: 9 additions & 5 deletions cmd/diagnostics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ func main() {
}
}()

packagePath := "github.com/erigontech/erigonwatch"
version, err := GetPackageVersion(packagePath)
if err == nil {
fmt.Printf("Diagnostics version: %s\n", version)
}
printUIVersion()

fmt.Printf("Diagnostics UI is running on http://%s:%d\n", listenAddr, listenPort)
//open(fmt.Sprintf("http://%s:%d", listenAddr, listenPort))
Expand All @@ -81,6 +77,14 @@ func main() {
}
}

func printUIVersion() {
packagePath := "github.com/erigontech/erigonwatch"
version, err := GetPackageVersion(packagePath)
if err == nil {
fmt.Printf("Diagnostics version: %s\n", version)
}
}

// open opens the specified URL in the default browser of the user.
/*func open(url string) error {
var cmd string
Expand Down
3 changes: 3 additions & 0 deletions internal/erigon_node/erigon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,7 @@ type Client interface {
FindProfile(ctx context.Context, profile string) ([]byte, error)

fetch(ctx context.Context, method string, params url.Values) (*NodeRequest, error)

Subscribe(ctx context.Context, channel chan []byte, service string) error
Unsubscribe(ctx context.Context, channel chan []byte, service string) error
}
53 changes: 53 additions & 0 deletions internal/erigon_node/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package erigon_node

import (
"context"
)

func (c *NodeClient) Subscribe(ctx context.Context, channel chan []byte, service string) error {
request, err := c.fetch(ctx, "subscribe/"+service, nil)

if err != nil {
return err
}

for {
more, result, err := request.nextResult(ctx)

if err != nil {
return err
}

channel <- result

if !more {
break
}
}

return nil
}

func (c *NodeClient) Unsubscribe(ctx context.Context, channel chan []byte, service string) error {
request, err := c.fetch(ctx, "unsubscribe/"+service, nil)

if err != nil {
return err
}

for {
more, result, err := request.nextResult(ctx)

if err != nil {
return err
}

channel <- result

if !more {
break
}
}

return nil
}

0 comments on commit b6c3833

Please sign in to comment.