diff --git a/exchanges/okx/okx.go b/exchanges/okx/okx.go index 5c7a50003..0982f4103 100644 --- a/exchanges/okx/okx.go +++ b/exchanges/okx/okx.go @@ -30,9 +30,6 @@ import ( type Okx struct { exchange.Base - // TODO: Remove this when the websocket multi-connection management integration is complete - WsResponseMultiplexer wsRequestDataChannelsMultiplexer - // WsRequestSemaphore channel is used to block write operation on the websocket connection to reduce contention; a kind of bounded parallelism. // it is made to hold up to 20 integers so that up to 20 write operations can be called over the websocket connection at a time. // and when the operation is completed the thread releases (consumes) one value from the channel so that the other waiting operation can enter. diff --git a/exchanges/okx/okx_types.go b/exchanges/okx/okx_types.go index 234d290b4..0ce794aa4 100644 --- a/exchanges/okx/okx_types.go +++ b/exchanges/okx/okx_types.go @@ -2342,15 +2342,6 @@ type WebsocketDataResponse struct { Data []interface{} `json:"data"` } -type wsRequestInfo struct { - ID string - Chan chan *wsIncomingData - Event string - Channel string - InstrumentType string - InstrumentID string -} - type wsIncomingData struct { Event string `json:"event,omitempty"` Argument SubscriptionInfo `json:"arg,omitempty"` @@ -3191,16 +3182,6 @@ type FundingOrder struct { EarningCcy []string `json:"earningCcy,omitempty"` } -// wsRequestDataChannelsMultiplexer a single multiplexer instance to multiplex websocket messages multiplexer channels -type wsRequestDataChannelsMultiplexer struct { - // To Synchronize incoming messages coming through the websocket channel - WsResponseChannelsMap map[string]*wsRequestInfo - Register chan *wsRequestInfo - Unregister chan string - Message chan *wsIncomingData - shutdown chan bool -} - // wsSubscriptionParameters represents toggling boolean values for subscription parameters. type wsSubscriptionParameters struct { InstrumentType bool diff --git a/exchanges/okx/okx_websocket.go b/exchanges/okx/okx_websocket.go index 68d1c3fa6..11c622165 100644 --- a/exchanges/okx/okx_websocket.go +++ b/exchanges/okx/okx_websocket.go @@ -8,13 +8,13 @@ import ( "fmt" "hash/crc32" "net/http" + "slices" "strconv" "strings" "time" "github.com/buger/jsonparser" "github.com/gorilla/websocket" - "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/crypto" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" @@ -274,56 +274,16 @@ func (ok *Okx) WsAuth(ctx context.Context, dialer *websocket.Dialer) error { return err } base64Sign := crypto.Base64Encode(hmac) - request := WebsocketEventRequest{ - Operation: operationLogin, - Arguments: []WebsocketLoginData{ - { - APIKey: creds.Key, - Passphrase: creds.ClientID, - Timestamp: timeUnix, - Sign: base64Sign, - }, + args := []WebsocketLoginData{ + { + APIKey: creds.Key, + Passphrase: creds.ClientID, + Timestamp: timeUnix, + Sign: base64Sign, }, } - err = ok.Websocket.AuthConn.SendJSONMessage(request) - if err != nil { - return err - } - timer := time.NewTimer(ok.WebsocketResponseCheckTimeout) - randomID, err := common.GenerateRandomString(16) - if err != nil { - return fmt.Errorf("%w, generating random string for incoming websocket response failed", err) - } - wsResponse := make(chan *wsIncomingData) - ok.WsResponseMultiplexer.Register <- &wsRequestInfo{ - ID: randomID, - Chan: wsResponse, - Event: operationLogin, - } - ok.WsRequestSemaphore <- 1 - defer func() { - <-ok.WsRequestSemaphore - }() - defer func() { ok.WsResponseMultiplexer.Unregister <- randomID }() - for { - select { - case data := <-wsResponse: - if data.Event == operationLogin && data.Code == "0" { - ok.Websocket.SetCanUseAuthenticatedEndpoints(true) - return nil - } else if data.Event == "error" && - (data.Code == "60022" || data.Code == "60009") { - ok.Websocket.SetCanUseAuthenticatedEndpoints(false) - return fmt.Errorf("authentication failed with error: %v", ErrorCodes[data.Code]) - } - continue - case <-timer.C: - timer.Stop() - return fmt.Errorf("%s websocket connection: timeout waiting for response with an operation: %v", - ok.Name, - request.Operation) - } - } + var resp *wsIncomingData + return ok.SendWebsocketRequest("1", operationLogin, args, &resp) } // wsReadData sends msgs from public and auth websockets to data handler @@ -554,8 +514,14 @@ func (ok *Okx) WsHandleData(respRaw []byte) error { } return fmt.Errorf("%w unmarshalling %v", err, respRaw) } - if (resp.Event != "" && (resp.Event == "login" || resp.Event == "error")) || resp.Operation != "" { - ok.WsResponseMultiplexer.Message <- &resp + if resp.Event != "" && + (resp.Event == "login" || + (resp.Event == "error" && + slices.Contains([]string{"60022", "60023", "60024", "60026", "63999", "60032", "60011", "60009", "60005", "60021", "60031"}, resp.Code))) { + // find error codes and corresponding reasons: https://www.okx.com/docs-v5/en/#error-code-websocket-public + if !ok.Websocket.Match.IncomingWithData("1", respRaw) { + return fmt.Errorf("%s: %w to payload %v", ok.Name, errWebsocketDataNotMatchedWithID, string(respRaw)) + } return nil } if len(resp.Data) == 0 { @@ -1343,55 +1309,6 @@ func (ok *Okx) wsProcessPushData(data []byte, resp interface{}) error { return nil } -// Run this functions distributes websocket request responses to -func (m *wsRequestDataChannelsMultiplexer) Run() { - tickerData := time.NewTicker(time.Second) - for { - select { - case <-m.shutdown: - // We've consumed the shutdown, so create a new chan for subsequent runs - m.shutdown = make(chan bool) - return - case <-tickerData.C: - for x, myChan := range m.WsResponseChannelsMap { - if myChan == nil { - delete(m.WsResponseChannelsMap, x) - } - } - case id := <-m.Unregister: - delete(m.WsResponseChannelsMap, id) - case reg := <-m.Register: - m.WsResponseChannelsMap[reg.ID] = reg - case msg := <-m.Message: - if msg.ID != "" && m.WsResponseChannelsMap[msg.ID] != nil { - m.WsResponseChannelsMap[msg.ID].Chan <- msg - continue - } - for _, myChan := range m.WsResponseChannelsMap { - if (msg.Event == "error" || myChan.Event == operationLogin) && - (msg.Code == "60009" || msg.Code == "60022" || msg.Code == "0") && - strings.Contains(msg.Msg, myChan.Channel) { - myChan.Chan <- msg - continue - } else if msg.Event != myChan.Event || - msg.Argument.Channel != myChan.Channel || - msg.Argument.InstrumentType != myChan.InstrumentType || - msg.Argument.InstrumentID != myChan.InstrumentID { - continue - } - myChan.Chan <- msg - break - } - } - } -} - -// Shutdown causes the multiplexer to exit its Run loop -// All channels are left open, but websocket shutdown first will ensure no more messages block on multiplexer reading -func (m *wsRequestDataChannelsMultiplexer) Shutdown() { - close(m.shutdown) -} - // wsChannelSubscription sends a subscription or unsubscription request for different channels through the websocket stream. func (ok *Okx) wsChannelSubscription(operation, channel string, assetType asset.Item, pair currency.Pair, tInstrumentType, tInstrumentID, tUnderlying bool) error { if operation != operationSubscribe && operation != operationUnsubscribe { diff --git a/exchanges/okx/okx_wrapper.go b/exchanges/okx/okx_wrapper.go index 16472d35e..9a79d728c 100644 --- a/exchanges/okx/okx_wrapper.go +++ b/exchanges/okx/okx_wrapper.go @@ -187,14 +187,6 @@ func (ok *Okx) Setup(exch *config.Exchange) error { return err } - ok.WsResponseMultiplexer = wsRequestDataChannelsMultiplexer{ - WsResponseChannelsMap: make(map[string]*wsRequestInfo), - Register: make(chan *wsRequestInfo), - Unregister: make(chan string), - Message: make(chan *wsIncomingData), - shutdown: make(chan bool), - } - wsRunningEndpoint, err := ok.API.Endpoints.GetURL(exchange.WebsocketSpot) if err != nil { return err @@ -216,8 +208,6 @@ func (ok *Okx) Setup(exch *config.Exchange) error { return err } - go ok.WsResponseMultiplexer.Run() - if err := ok.Websocket.SetupNewConnection(stream.ConnectionSetup{ URL: okxAPIWebsocketPublicURL, ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, @@ -236,18 +226,6 @@ func (ok *Okx) Setup(exch *config.Exchange) error { }) } -// Shutdown calls Base.Shutdown and then shuts down the response multiplexer -func (ok *Okx) Shutdown() error { - if err := ok.Base.Shutdown(); err != nil { - return err - } - - // Must happen after the Websocket shutdown in Base.Shutdown, so there are no new blocking writes to the multiplexer - ok.WsResponseMultiplexer.Shutdown() - - return nil -} - // GetServerTime returns the current exchange server time. func (ok *Okx) GetServerTime(ctx context.Context, _ asset.Item) (time.Time, error) { return ok.GetSystemTime(ctx)