Skip to content

Commit

Permalink
Add support for error responses in appservice websocket commands
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Jul 16, 2021
1 parent 5993d62 commit 3ff1d9c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
4 changes: 2 additions & 2 deletions appservice/appservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package appservice

import (
"encoding/json"
"errors"
"fmt"
"html/template"
Expand Down Expand Up @@ -114,14 +113,15 @@ type AppService struct {
ws *websocket.Conn
StopWebsocket func(error)
WebsocketCommands chan WebsocketCommand
websocketRequests map[int]chan<- json.RawMessage
websocketRequests map[int]chan<- *WebsocketCommand
websocketRequestsLock sync.RWMutex
websocketRequestID int32
}

func (as *AppService) PrepareWebsocket() {
if as.WebsocketCommands == nil {
as.WebsocketCommands = make(chan WebsocketCommand, 32)
as.websocketRequests = make(map[int]chan<- *WebsocketCommand)
}
}

Expand Down
34 changes: 27 additions & 7 deletions appservice/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ func (as *AppService) SendWebsocket(cmd *WebsocketRequest) error {
return as.ws.WriteJSON(cmd)
}

func (as *AppService) addWebsocketResponseWaiter(reqID int, waiter chan<- json.RawMessage) {
func (as *AppService) addWebsocketResponseWaiter(reqID int, waiter chan<- *WebsocketCommand) {
as.websocketRequestsLock.Lock()
as.websocketRequests[reqID] = waiter
as.websocketRequestsLock.Unlock()
}

func (as *AppService) removeWebsocketResponseWaiter(reqID int, waiter chan<- json.RawMessage) {
func (as *AppService) removeWebsocketResponseWaiter(reqID int, waiter chan<- *WebsocketCommand) {
as.websocketRequestsLock.Lock()
existingWaiter, ok := as.websocketRequests[reqID]
if ok && existingWaiter == waiter {
Expand All @@ -134,19 +134,39 @@ func (as *AppService) removeWebsocketResponseWaiter(reqID int, waiter chan<- jso
as.websocketRequestsLock.Unlock()
}

type ErrorResponse struct {
Code string `json:"code"`
Message string `json:"message"`
}

func (er *ErrorResponse) Error() string {
return fmt.Sprintf("%s: %s", er.Code, er.Message)
}

func (as *AppService) RequestWebsocket(ctx context.Context, cmd *WebsocketRequest, response interface{}) error {
cmd.ReqID = int(atomic.AddInt32(&as.websocketRequestID, 1))
respChan := make(chan json.RawMessage, 1)
respChan := make(chan *WebsocketCommand, 1)
as.addWebsocketResponseWaiter(cmd.ReqID, respChan)
defer as.removeWebsocketResponseWaiter(cmd.ReqID, respChan)
err := as.SendWebsocket(cmd)
if err != nil {
return err
}
select {
case data := <-respChan:
if response != nil {
return json.Unmarshal(data, &response)
case resp := <-respChan:
if resp.Command == "error" {
var respErr ErrorResponse
err = json.Unmarshal(resp.Data, &respErr)
if err != nil {
return fmt.Errorf("failed to parse error JSON: %w", err)
}
return &respErr
} else if response != nil {
err = json.Unmarshal(resp.Data, &response)
if err != nil {
return fmt.Errorf("failed to parse response JSON: %w", err)
}
return nil
} else {
return nil
}
Expand Down Expand Up @@ -177,7 +197,7 @@ func (as *AppService) consumeWebsocket(stopFunc func(error), ws *websocket.Conn)
respChan, ok := as.websocketRequests[msg.ReqID]
if ok {
select {
case respChan <- msg.Data:
case respChan <- &msg.WebsocketCommand:
default:
as.Log.Warnln("Failed to handle response to %d: channel didn't accept response", msg.ReqID)
}
Expand Down

0 comments on commit 3ff1d9c

Please sign in to comment.