Skip to content

Commit

Permalink
Make All Socket Write Operations Atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
davidvonthenen committed Jun 20, 2024
1 parent 5868b91 commit de5d0ff
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 79 deletions.
4 changes: 2 additions & 2 deletions pkg/client/interfaces/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func (o *ClientOptions) Parse() error {
return nil
}

func (c *ClientOptions) InspectMessage() bool {
return c.AutoFlushReplyDelta != 0
func (o *ClientOptions) InspectMessage() bool {
return o.AutoFlushReplyDelta != 0
}

func (o *PreRecordedTranscriptionOptions) Check() error {
Expand Down
203 changes: 128 additions & 75 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces"
)

type controlMessage struct {
Type string `json:"type"`
}

/*
NewForDemo creates a new websocket connection with all default options
Expand Down Expand Up @@ -120,34 +124,34 @@ func (c *Client) Connect() bool {
if c.retryCnt == 0 {
c.retryCnt = DefaultConnectRetry
}
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt)) != nil
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), true) != nil
}

// ConnectWithCancel performs a websocket connection with specified number of retries and providing a
// cancel function to stop the connection
func (c *Client) ConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) bool {
return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt) != nil
return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt, true) != nil
}

// AttemptReconnect performs a reconnect after failing retries
func (c *Client) AttemptReconnect(ctx context.Context, retries int64) bool {
c.retry = true
c.ctx, c.ctxCancel = context.WithCancel(ctx)
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries)) != nil
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries), true) != nil
}

// AttemptReconnect performs a reconnect after failing retries and providing a cancel function
func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) bool {
c.retry = true
return c.internalConnectWithCancel(ctx, ctxCancel, int(retries)) != nil
return c.internalConnectWithCancel(ctx, ctxCancel, int(retries), true) != nil
}

func (c *Client) internalConnect() *websocket.Conn {
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt))
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), false)
}

//nolint:funlen // this is a complex function. keep as is
func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn {
func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int, lock bool) *websocket.Conn {
klog.V(7).Infof("live.Connect() ENTER\n")

// set the context
Expand All @@ -162,6 +166,13 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
return nil
}

// lock conn access
if lock {
klog.V(3).Infof("Locking connection mutex\n")
c.muConn.Lock()
defer c.muConn.Unlock()
}

// if the connection is good, return it otherwise, attempt reconnect
if c.wsconn != nil {
select {
Expand All @@ -170,6 +181,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n")
return nil
default:

klog.V(7).Infof("Connection is good. Return object.")
klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n")
return c.wsconn
Expand Down Expand Up @@ -228,10 +240,6 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
}
klog.V(5).Infof("Connecting to %s\n", url)

// a single connection attempt
// Note: not using defer here because we arent leaving the scope of the function
c.muConn.Lock()

// perform the websocket connection
ws, res, err := dialer.DialContext(c.ctx, url, myHeader)
if res != nil {
Expand All @@ -241,17 +249,13 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
if err != nil {
klog.V(1).Infof("Cannot connect to websocket: %s\n", c.cOptions.Host)
klog.V(1).Infof("Dialer failed. Err: %v\n", err)
c.muConn.Unlock()
continue
}

// set the object to allow threads to function
c.wsconn = ws
c.retry = true

// unlock the connection
c.muConn.Unlock()

// kick off threads to listen for messages and ping/keepalive
go c.listen()
if c.cOptions.EnableKeepAlive {
Expand Down Expand Up @@ -282,6 +286,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
return nil
}

//nolint:funlen,gocyclo // this is a complex function. keep as is
func (c *Client) listen() {
klog.V(6).Infof("live.listen() ENTER\n")

Expand Down Expand Up @@ -312,6 +317,20 @@ func (c *Client) listen() {
// graceful close
c.closeWs(false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, UseOfClosedSocket):
klog.V(3).Infof("Probable graceful websocket close: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr)
}

// fatal close
c.closeWs(false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, FatalReadSocketErr):
Expand Down Expand Up @@ -456,19 +475,19 @@ func (c *Client) WriteBinary(byData []byte) error {
klog.V(7).Infof("live.WriteBinary() ENTER\n")

// doing a write, need to lock
c.muConn.Lock()
defer c.muConn.Unlock()

// get the connection
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteBinary() LEAVE\n")

return err
}

// doing a write, need to lock
c.muConn.Lock()
defer c.muConn.Unlock()

if err := ws.WriteMessage(
websocket.BinaryMessage,
byData,
Expand All @@ -492,16 +511,6 @@ managing the live transcription session on the Deepgram server.
func (c *Client) WriteJSON(payload interface{}) error {
klog.V(7).Infof("live.WriteJSON() ENTER\n")

// doing a write, need to lock
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteJSON() LEAVE\n")

return err
}

byData, err := json.Marshal(payload)
if err != nil {
klog.V(1).Infof("WriteJSON json.Marshal failed. Err: %v\n", err)
Expand All @@ -513,6 +522,16 @@ func (c *Client) WriteJSON(payload interface{}) error {
c.muConn.Lock()
defer c.muConn.Unlock()

// doing a write, need to lock
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteJSON() LEAVE\n")

return err
}

if err := ws.WriteMessage(
websocket.TextMessage,
byData,
Expand Down Expand Up @@ -548,27 +567,92 @@ func (c *Client) Write(p []byte) (int, error) {
return byteLen, nil
}

func (c *Client) KeepAlive() error {
klog.V(7).Infof("live.KeepAlive() ENTER\n")

err := c.WriteJSON(controlMessage{Type: "KeepAlive"})
if err != nil {
klog.V(1).Infof("Finalize failed. Err: %v\n", err)
klog.V(7).Infof("live.KeepAlive() LEAVE\n")

return err
}

klog.V(4).Infof("KeepAlive Succeeded\n")
klog.V(7).Infof("live.KeepAlive() LEAVE\n")

return err
}

func (c *Client) Finalize() error {
klog.V(7).Infof("live.Finalize() ENTER\n")
klog.V(7).Infof("live.KeepAlive() ENTER\n")

err := c.WriteJSON(controlMessage{Type: "Finalize"})
if err != nil {
klog.V(1).Infof("Finalize failed. Err: %v\n", err)
klog.V(7).Infof("live.Finalize() LEAVE\n")

return err
}

klog.V(4).Infof("Finalize Succeeded\n")
klog.V(7).Infof("live.Finalize() LEAVE\n")

return err
}

func (c *Client) closeStream(lock bool) error {
klog.V(7).Infof("live.closeStream() ENTER\n")

// doing a write, need to lock
if lock {
c.muConn.Lock()
defer c.muConn.Unlock()
}

err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"CloseStream\" }"))
if err != nil {
klog.V(1).Infof("WriteMessage failed. Err: %v\n", err)
klog.V(7).Infof("live.closeStream() LEAVE\n")

return err
}

klog.V(4).Infof("closeStream Succeeded\n")
klog.V(7).Infof("live.closeStream() LEAVE\n")

return err
}

func (c *Client) normalClosure(lock bool) error {
klog.V(7).Infof("live.normalClosure() ENTER\n")

// doing a write, need to lock
if lock {
c.muConn.Lock()
defer c.muConn.Unlock()
}

ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.Finalize() LEAVE\n")
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.normalClosure() LEAVE\n")

return err
}

// doing a write, need to lock
c.muConn.Lock()
defer c.muConn.Unlock()

err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Finalize\" }"))
err := c.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
switch err {
case websocket.ErrCloseSent:
klog.V(3).Infof("ErrCloseSent was sent. Err: %v\n", err)
case nil:
klog.V(4).Infof("normalClosure Succeeded\n")
default:
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", err)
}

klog.V(4).Infof("Finalize Succeeded\n")
klog.V(7).Infof("live.Finalize() LEAVE\n")
klog.V(7).Infof("live.normalClosure() LEAVE\n")

return err
}
Expand All @@ -592,21 +676,11 @@ func (c *Client) closeWs(fatal bool) {

if c.wsconn != nil && !fatal {
// deepgram requires a close message to be sent
errDg := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"CloseStream\" }"))
if errDg == websocket.ErrCloseSent {
klog.V(3).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg)
} else if errDg != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg)
}
_ = c.closeStream(false)
time.Sleep(TerminationSleep) // allow time for server to register closure

// websocket protocol message
errProto := c.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if errProto == websocket.ErrCloseSent {
klog.V(3).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto)
} else if errProto != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto)
}
_ = c.normalClosure(false)
time.Sleep(TerminationSleep) // allow time for server to register closure
}

Expand Down Expand Up @@ -650,28 +724,14 @@ func (c *Client) ping() {
klog.V(5).Infof("Starting ping...")
counter++

ws := c.internalConnect()
if ws == nil {
klog.V(1).Infof("ping Connection is not valid\n")
klog.V(6).Infof("live.ping() LEAVE\n")
return
}

// doing a write, need to lock.
// Note: not using defer here because we arent leaving the scope of the function
c.muConn.Lock()

// deepgram keepalive message
klog.V(5).Infof("Sending Deepgram KeepAlive message...\n")
err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"KeepAlive\" }"))
err := c.KeepAlive()
if err == nil {
klog.V(5).Infof("Ping sent!")
} else {
klog.V(1).Infof("Failed to send Deepgram KeepAlive. Err: %v\n", err)
}

// release
c.muConn.Unlock()
}
}
}
Expand All @@ -692,13 +752,6 @@ func (c *Client) flush() {
klog.V(6).Infof("live.flush() LEAVE\n")
return
case <-ticker.C:
ws := c.internalConnect()
if ws == nil {
klog.V(1).Infof("flush Connection is not valid\n")
klog.V(6).Infof("live.flush() LEAVE\n")
return
}

// doing a read, need to lock.
c.muFinal.Lock()

Expand Down Expand Up @@ -762,7 +815,7 @@ func (c *Client) errorToResponse(err error) *msginterfaces.ErrorResponse {
} else {
errorCode = UnknownDeepgramErr
errorNum = UnknownDeepgramErr
errorDesc = UnknownDeepgramErr
errorDesc = err.Error()
}

response := &msginterfaces.ErrorResponse{
Expand Down Expand Up @@ -818,7 +871,7 @@ func (c *Client) inspectMessage(mr *msginterfaces.MessageResponse) error {
klog.V(7).Infof("client.inspectMessage() ENTER\n")

sentence := strings.TrimSpace(mr.Channel.Alternatives[0].Transcript)
if len(mr.Channel.Alternatives) == 0 || len(sentence) == 0 {
if len(mr.Channel.Alternatives) == 0 || sentence == "" {
klog.V(7).Info("inspectMessage is empty\n")
klog.V(7).Infof("client.inspectMessage() LEAVE\n")
return nil
Expand Down
Loading

0 comments on commit de5d0ff

Please sign in to comment.