diff --git a/common/common.go b/common/common.go index 65c05172fbe..7fb5cf92756 100644 --- a/common/common.go +++ b/common/common.go @@ -72,6 +72,7 @@ var ( ErrUnknownError = errors.New("unknown error") ErrGettingField = errors.New("error getting field") ErrSettingField = errors.New("error setting field") + ErrParsingWSField = errors.New("error parsing websocket field") ) var ( diff --git a/exchanges/bitfinex/bitfinex_test.go b/exchanges/bitfinex/bitfinex_test.go index 854d7a46761..41dd2685f6e 100644 --- a/exchanges/bitfinex/bitfinex_test.go +++ b/exchanges/bitfinex/bitfinex_test.go @@ -1997,7 +1997,7 @@ func TestGetErrResp(t *testing.T) { seen++ switch seen { case 1: // no event - assert.ErrorIs(t, testErr, errParsingWSField, "Message with no event Should get correct error type") + assert.ErrorIs(t, testErr, common.ErrParsingWSField, "Message with no event should get correct error type") assert.ErrorContains(t, testErr, "'event'", "Message with no event error should contain missing field name") assert.ErrorContains(t, testErr, "nightjar", "Message with no event error should contain the message") case 2: // with {} for event diff --git a/exchanges/bitfinex/bitfinex_types.go b/exchanges/bitfinex/bitfinex_types.go index 69c506ea977..8005f7dfec4 100644 --- a/exchanges/bitfinex/bitfinex_types.go +++ b/exchanges/bitfinex/bitfinex_types.go @@ -14,7 +14,6 @@ var ( errSetCannotBeEmpty = errors.New("set cannot be empty") errNoSeqNo = errors.New("no sequence number") errParamNotAllowed = errors.New("param not allowed") - errParsingWSField = errors.New("error parsing WS field") errTickerInvalidSymbol = errors.New("invalid ticker symbol") errTickerInvalidResp = errors.New("invalid ticker response format") errTickerInvalidFieldCount = errors.New("invalid ticker response field count") diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index ee1b854b0b0..b0f41d2a434 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -446,7 +446,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { func (b *Bitfinex) handleWSEvent(respRaw []byte) error { event, err := jsonparser.GetUnsafeString(respRaw, "event") if err != nil { - return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, respRaw) + return fmt.Errorf("%w 'event': %w from message: %s", common.ErrParsingWSField, err, respRaw) } switch event { case wsEventSubscribed: @@ -454,7 +454,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { case wsEventUnsubscribed: chanID, err := jsonparser.GetUnsafeString(respRaw, "chanId") if err != nil { - return fmt.Errorf("%w 'chanId': %w from message: %s", errParsingWSField, err, respRaw) + return fmt.Errorf("%w 'chanId': %w from message: %s", common.ErrParsingWSField, err, respRaw) } err = b.Websocket.Match.RequireMatchWithData("unsubscribe:"+chanID, respRaw) if err != nil { @@ -477,7 +477,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { case wsEventAuth: status, err := jsonparser.GetUnsafeString(respRaw, "status") if err != nil { - return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw) + return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, respRaw) } if status == "OK" { var glob map[string]interface{} @@ -489,7 +489,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { } else { errCode, err := jsonparser.GetInt(respRaw, "code") if err != nil { - log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, respRaw) + log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, common.ErrParsingWSField, err, respRaw) } return fmt.Errorf("WS auth subscription error; Status: %s Error Code: %d", status, errCode) } @@ -499,7 +499,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { case wsEventConf: status, err := jsonparser.GetUnsafeString(respRaw, "status") if err != nil { - return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw) + return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, respRaw) } if status != "OK" { return fmt.Errorf("WS configure channel error; Status: %s", status) @@ -516,7 +516,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error { subID, err := jsonparser.GetUnsafeString(respRaw, "subId") if err != nil { - return fmt.Errorf("%w 'subId': %w from message: %s", errParsingWSField, err, respRaw) + return fmt.Errorf("%w 'subId': %w from message: %s", common.ErrParsingWSField, err, respRaw) } c := b.Websocket.GetSubscription(subID) @@ -526,7 +526,7 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error { chanID, err := jsonparser.GetInt(respRaw, "chanId") if err != nil { - return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Pairs) + return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, common.ErrParsingWSField, err, c.Channel, c.Pairs) } // Note: chanID's int type avoids conflicts with the string type subID key because of the type difference @@ -1815,19 +1815,19 @@ func (b *Bitfinex) unsubscribeFromChan(subs subscription.List) error { func (b *Bitfinex) getErrResp(resp []byte) error { event, err := jsonparser.GetUnsafeString(resp, "event") if err != nil { - return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, resp) + return fmt.Errorf("%w 'event': %w from message: %s", common.ErrParsingWSField, err, resp) } if event != "error" { return nil } errCode, err := jsonparser.GetInt(resp, "code") if err != nil { - log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, resp) + log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, common.ErrParsingWSField, err, resp) } var apiErr error if msg, e2 := jsonparser.GetString(resp, "msg"); e2 != nil { - log.Errorf(log.ExchangeSys, "%s %s 'msg': %s from message: %s", b.Name, errParsingWSField, e2, resp) + log.Errorf(log.ExchangeSys, "%s %s 'msg': %s from message: %s", b.Name, common.ErrParsingWSField, e2, resp) apiErr = common.ErrUnknownError } else { apiErr = errors.New(msg) diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index 8fe4f2fc1fa..612b8258840 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -33,7 +33,6 @@ const ( ) var ( - errParsingWSField = errors.New("error parsing WS field") errParsingWSPair = errors.New("unable to parse currency pair from wsResponse.Channel") errChannelHyphens = errors.New("channel name does not contain exactly 0 or 2 hyphens") errChannelUnderscores = errors.New("channel name does not contain exactly 2 underscores") @@ -102,7 +101,7 @@ func (b *Bitstamp) wsReadData() { func (b *Bitstamp) wsHandleData(respRaw []byte) error { event, err := jsonparser.GetUnsafeString(respRaw, "event") if err != nil { - return fmt.Errorf("%w `event`: %w", errParsingWSField, err) + return fmt.Errorf("%w `event`: %w", common.ErrParsingWSField, err) } event = strings.TrimPrefix(event, "bts:") @@ -132,7 +131,7 @@ func (b *Bitstamp) wsHandleData(respRaw []byte) error { func (b *Bitstamp) handleWSSubscription(event string, respRaw []byte) error { channel, err := jsonparser.GetUnsafeString(respRaw, "channel") if err != nil { - return fmt.Errorf("%w `channel`: %w", errParsingWSField, err) + return fmt.Errorf("%w `channel`: %w", common.ErrParsingWSField, err) } event = strings.TrimSuffix(event, "scription_succeeded") return b.Websocket.Match.RequireMatchWithData(event+":"+channel, respRaw) @@ -402,7 +401,7 @@ func (b *Bitstamp) FetchWSAuth(ctx context.Context) (*WebsocketAuthResponse, err func (b *Bitstamp) parseChannelName(respRaw []byte) (string, currency.Pair, error) { channel, err := jsonparser.GetUnsafeString(respRaw, "channel") if err != nil { - return "", currency.EMPTYPAIR, fmt.Errorf("%w `channel`: %w", errParsingWSField, err) + return "", currency.EMPTYPAIR, fmt.Errorf("%w `channel`: %w", common.ErrParsingWSField, err) } authParts := strings.Split(channel, "-") diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 2babae23b81..52bff477868 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -27,6 +27,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" + "github.com/thrasher-corp/gocryptotrader/exchanges/trade" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions" mockws "github.com/thrasher-corp/gocryptotrader/internal/testing/websocket" @@ -1221,6 +1222,41 @@ func TestWsHandleData(t *testing.T) { testexch.FixtureToDataHandler(t, "testdata/wsHandleData.json", k.wsHandleData) } +func TestWSProcessTrades(t *testing.T) { + t.Parallel() + + k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + require.NoError(t, testexch.Setup(k), "Test instance Setup must not error") + err := k.Websocket.AddSubscriptions(k.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{spotTestPair}, Channel: subscription.AllTradesChannel, Key: 18788}) + require.NoError(t, err, "AddSubscriptions must not error") + testexch.FixtureToDataHandler(t, "testdata/wsAllTrades.json", k.wsHandleData) + close(k.Websocket.DataHandler) + + invalid := []any{"trades", []any{[]interface{}{"95873.80000", "0.00051182", "1708731380.3791859"}}} + pair := currency.NewPair(currency.XBT, currency.USD) + err = k.wsProcessTrades(invalid, pair) + require.ErrorContains(t, err, "unexpected trade data length") + + expJSON := []string{ + `{"AssetType":"spot","CurrencyPair":"XBT/USD","Side":"BUY","Price":95873.80000,"Amount":0.00051182,"Timestamp":"2025-02-23T23:29:40.379185914Z"}`, + `{"AssetType":"spot","CurrencyPair":"XBT/USD","Side":"SELL","Price":95940.90000,"Amount":0.00011069,"Timestamp":"2025-02-24T02:01:12.853682041Z"}`, + } + require.Len(t, k.Websocket.DataHandler, len(expJSON), "Must see correct number of trades") + for resp := range k.Websocket.DataHandler { + switch v := resp.(type) { + case trade.Data: + i := 1 - len(k.Websocket.DataHandler) + exp := trade.Data{Exchange: k.Name, CurrencyPair: spotTestPair} + require.NoErrorf(t, json.Unmarshal([]byte(expJSON[i]), &exp), "Must not error unmarshalling json %d: %s", i, expJSON[i]) + require.Equalf(t, exp, v, "Trade [%d] must be correct", i) + case error: + t.Error(v) + default: + t.Errorf("Unexpected type in DataHandler: %T (%s)", v, v) + } + } +} + func TestWsOpenOrders(t *testing.T) { t.Parallel() k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 5b276055181..f3a64540bba 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -79,7 +79,6 @@ func init() { var ( authToken string - errParsingWSField = errors.New("error parsing WS field") errCancellingOrder = errors.New("error cancelling order") errSubPairMissing = errors.New("pair missing from subscription response") errInvalidChecksum = errors.New("invalid checksum") @@ -531,7 +530,9 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error { if !ok { return errors.New("received invalid trade data") } - if !k.IsSaveTradeDataEnabled() { + saveTradeData := k.IsSaveTradeDataEnabled() + tradeFeed := k.IsTradeFeedEnabled() + if !saveTradeData && !tradeFeed { return nil } trades := make([]trade.Data, len(data)) @@ -540,24 +541,37 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error { if !ok { return errors.New("unidentified trade data received") } - timeData, err := strconv.ParseFloat(t[2].(string), 64) + if len(t) < 4 { + return fmt.Errorf("%w; unexpected trade data length: %d", common.ErrParsingWSField, len(t)) + } + ts, ok := t[2].(string) + if !ok { + return common.GetTypeAssertError("string", t[2], "trade.time") + } + timeData, err := strconv.ParseFloat(ts, 64) if err != nil { return err } - - price, err := strconv.ParseFloat(t[0].(string), 64) + p, ok := t[0].(string) + if !ok { + return common.GetTypeAssertError("string", t[0], "trade.price") + } + price, err := strconv.ParseFloat(p, 64) if err != nil { return err } - - amount, err := strconv.ParseFloat(t[1].(string), 64) + v, ok := t[1].(string) + if !ok { + return common.GetTypeAssertError("string", t[1], "trade.volume") + } + amount, err := strconv.ParseFloat(v, 64) if err != nil { return err } var tSide = order.Buy s, ok := t[3].(string) if !ok { - return common.GetTypeAssertError("string", t[3], "side") + return common.GetTypeAssertError("string", t[3], "trade.side") } if s == "s" { tSide = order.Sell @@ -573,7 +587,15 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error { Side: tSide, } } - return trade.AddTradesToBuffer(k.Name, trades...) + if tradeFeed { + for i := range trades { + k.Websocket.DataHandler <- trades[i] + } + } + if saveTradeData { + return trade.AddTradesToBuffer(k.Name, trades...) + } + return nil } // wsProcessOrderBook handles both partial and full orderbook updates @@ -1331,7 +1353,7 @@ func (k *Kraken) wsCancelOrder(orderID string) error { status, err := jsonparser.GetUnsafeString(resp, "status") if err != nil { - return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, resp) + return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, resp) } else if status == "ok" { return nil } diff --git a/exchanges/kraken/testdata/wsAllTrades.json b/exchanges/kraken/testdata/wsAllTrades.json new file mode 100644 index 00000000000..db8626061ec --- /dev/null +++ b/exchanges/kraken/testdata/wsAllTrades.json @@ -0,0 +1,2 @@ +[119930881,[["95873.80000","0.00051182","1740353380.379186","b","l",""]],"trade","XBT/USD"] +[119930881,[["95940.90000","0.00011069","1740362472.853682","s","l",""]],"trade","XBT/USD"] diff --git a/testdata/configtest.json b/testdata/configtest.json index 2d7a7624adf..bcfb2bc4a40 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -1967,7 +1967,9 @@ }, "enabled": { "autoPairUpdates": true, - "websocketAPI": true + "websocketAPI": true, + "saveTradeData": false, + "tradeFeed": true } }, "bankAccounts": [