Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kraken: Fix sending trades to the websocket DataHandler #1813

Merged
merged 6 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ var (
ErrUnknownError = errors.New("unknown error")
ErrGettingField = errors.New("error getting field")
ErrSettingField = errors.New("error setting field")
ErrParsingWSField = errors.New("error parsing WS field")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ErrParsingWSField = errors.New("error parsing WS field")
ErrParsingWSField = errors.New("error parsing websocket field")

This wasn't your doing, but I'd prefer that we don't use acronyms for error messages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, I originally wrote "websocket" but changed it when I saw the original errors 😄 . Updated

)

var (
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1997,7 +1997,7 @@ func TestGetErrResp(t *testing.T) {
seen++
switch seen {
case 1: // no event
assert.ErrorIs(t, testErr, errParsingWSField, "Message with no event Should get correct error type")
assert.ErrorIs(t, testErr, common.ErrParsingWSField, "Message with no event Should get correct error type")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert.ErrorIs(t, testErr, common.ErrParsingWSField, "Message with no event Should get correct error type")
assert.ErrorIs(t, testErr, common.ErrParsingWSField, "Message with no event should get correct error type")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assert.ErrorContains(t, testErr, "'event'", "Message with no event error should contain missing field name")
assert.ErrorContains(t, testErr, "nightjar", "Message with no event error should contain the message")
case 2: // with {} for event
Expand Down
1 change: 0 additions & 1 deletion exchanges/bitfinex/bitfinex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ var (
errSetCannotBeEmpty = errors.New("set cannot be empty")
errNoSeqNo = errors.New("no sequence number")
errParamNotAllowed = errors.New("param not allowed")
errParsingWSField = errors.New("error parsing WS field")
errTickerInvalidSymbol = errors.New("invalid ticker symbol")
errTickerInvalidResp = errors.New("invalid ticker response format")
errTickerInvalidFieldCount = errors.New("invalid ticker response field count")
Expand Down
20 changes: 10 additions & 10 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,15 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
event, err := jsonparser.GetUnsafeString(respRaw, "event")
if err != nil {
return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, respRaw)
return fmt.Errorf("%w 'event': %w from message: %s", common.ErrParsingWSField, err, respRaw)
}
switch event {
case wsEventSubscribed:
return b.handleWSSubscribed(respRaw)
case wsEventUnsubscribed:
chanID, err := jsonparser.GetUnsafeString(respRaw, "chanId")
if err != nil {
return fmt.Errorf("%w 'chanId': %w from message: %s", errParsingWSField, err, respRaw)
return fmt.Errorf("%w 'chanId': %w from message: %s", common.ErrParsingWSField, err, respRaw)
}
err = b.Websocket.Match.RequireMatchWithData("unsubscribe:"+chanID, respRaw)
if err != nil {
Expand All @@ -477,7 +477,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
case wsEventAuth:
status, err := jsonparser.GetUnsafeString(respRaw, "status")
if err != nil {
return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw)
return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, respRaw)
}
if status == "OK" {
var glob map[string]interface{}
Expand All @@ -489,7 +489,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
} else {
errCode, err := jsonparser.GetInt(respRaw, "code")
if err != nil {
log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, respRaw)
log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, common.ErrParsingWSField, err, respRaw)
}
return fmt.Errorf("WS auth subscription error; Status: %s Error Code: %d", status, errCode)
}
Expand All @@ -499,7 +499,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
case wsEventConf:
status, err := jsonparser.GetUnsafeString(respRaw, "status")
if err != nil {
return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw)
return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, respRaw)
}
if status != "OK" {
return fmt.Errorf("WS configure channel error; Status: %s", status)
Expand All @@ -516,7 +516,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
subID, err := jsonparser.GetUnsafeString(respRaw, "subId")
if err != nil {
return fmt.Errorf("%w 'subId': %w from message: %s", errParsingWSField, err, respRaw)
return fmt.Errorf("%w 'subId': %w from message: %s", common.ErrParsingWSField, err, respRaw)
}

c := b.Websocket.GetSubscription(subID)
Expand All @@ -526,7 +526,7 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {

chanID, err := jsonparser.GetInt(respRaw, "chanId")
if err != nil {
return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Pairs)
return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, common.ErrParsingWSField, err, c.Channel, c.Pairs)
}

// Note: chanID's int type avoids conflicts with the string type subID key because of the type difference
Expand Down Expand Up @@ -1815,19 +1815,19 @@ func (b *Bitfinex) unsubscribeFromChan(subs subscription.List) error {
func (b *Bitfinex) getErrResp(resp []byte) error {
event, err := jsonparser.GetUnsafeString(resp, "event")
if err != nil {
return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, resp)
return fmt.Errorf("%w 'event': %w from message: %s", common.ErrParsingWSField, err, resp)
}
if event != "error" {
return nil
}
errCode, err := jsonparser.GetInt(resp, "code")
if err != nil {
log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, resp)
log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, common.ErrParsingWSField, err, resp)
}

var apiErr error
if msg, e2 := jsonparser.GetString(resp, "msg"); e2 != nil {
log.Errorf(log.ExchangeSys, "%s %s 'msg': %s from message: %s", b.Name, errParsingWSField, e2, resp)
log.Errorf(log.ExchangeSys, "%s %s 'msg': %s from message: %s", b.Name, common.ErrParsingWSField, e2, resp)
apiErr = common.ErrUnknownError
} else {
apiErr = errors.New(msg)
Expand Down
7 changes: 3 additions & 4 deletions exchanges/bitstamp/bitstamp_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const (
)

var (
errParsingWSField = errors.New("error parsing WS field")
errParsingWSPair = errors.New("unable to parse currency pair from wsResponse.Channel")
errChannelHyphens = errors.New("channel name does not contain exactly 0 or 2 hyphens")
errChannelUnderscores = errors.New("channel name does not contain exactly 2 underscores")
Expand Down Expand Up @@ -102,7 +101,7 @@ func (b *Bitstamp) wsReadData() {
func (b *Bitstamp) wsHandleData(respRaw []byte) error {
event, err := jsonparser.GetUnsafeString(respRaw, "event")
if err != nil {
return fmt.Errorf("%w `event`: %w", errParsingWSField, err)
return fmt.Errorf("%w `event`: %w", common.ErrParsingWSField, err)
}

event = strings.TrimPrefix(event, "bts:")
Expand Down Expand Up @@ -132,7 +131,7 @@ func (b *Bitstamp) wsHandleData(respRaw []byte) error {
func (b *Bitstamp) handleWSSubscription(event string, respRaw []byte) error {
channel, err := jsonparser.GetUnsafeString(respRaw, "channel")
if err != nil {
return fmt.Errorf("%w `channel`: %w", errParsingWSField, err)
return fmt.Errorf("%w `channel`: %w", common.ErrParsingWSField, err)
}
event = strings.TrimSuffix(event, "scription_succeeded")
return b.Websocket.Match.RequireMatchWithData(event+":"+channel, respRaw)
Expand Down Expand Up @@ -402,7 +401,7 @@ func (b *Bitstamp) FetchWSAuth(ctx context.Context) (*WebsocketAuthResponse, err
func (b *Bitstamp) parseChannelName(respRaw []byte) (string, currency.Pair, error) {
channel, err := jsonparser.GetUnsafeString(respRaw, "channel")
if err != nil {
return "", currency.EMPTYPAIR, fmt.Errorf("%w `channel`: %w", errParsingWSField, err)
return "", currency.EMPTYPAIR, fmt.Errorf("%w `channel`: %w", common.ErrParsingWSField, err)
}

authParts := strings.Split(channel, "-")
Expand Down
36 changes: 36 additions & 0 deletions exchanges/kraken/kraken_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
mockws "github.com/thrasher-corp/gocryptotrader/internal/testing/websocket"
Expand Down Expand Up @@ -1221,6 +1222,41 @@ func TestWsHandleData(t *testing.T) {
testexch.FixtureToDataHandler(t, "testdata/wsHandleData.json", k.wsHandleData)
}

func TestWSProcessTrades(t *testing.T) {
t.Parallel()

k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(k), "Test instance Setup must not error")
err := k.Websocket.AddSubscriptions(k.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{spotTestPair}, Channel: subscription.AllTradesChannel, Key: 18788})
require.NoError(t, err, "AddSubscriptions must not error")
testexch.FixtureToDataHandler(t, "testdata/wsAllTrades.json", k.wsHandleData)
close(k.Websocket.DataHandler)

invalid := []any{"trades", []any{[]interface{}{"95873.80000", "0.00051182", "1708731380.3791859"}}}
pair := currency.NewPair(currency.XBT, currency.USD)
err = k.wsProcessTrades(invalid, pair)
require.ErrorContains(t, err, "unexpected trade data length")

expJSON := []string{
`{"AssetType":"spot","CurrencyPair":"XBT/USD","Side":"BUY","Price":95873.80000,"Amount":0.00051182,"Timestamp":"2025-02-23T23:29:40.379185914Z"}`,
`{"AssetType":"spot","CurrencyPair":"XBT/USD","Side":"SELL","Price":95940.90000,"Amount":0.00011069,"Timestamp":"2025-02-24T02:01:12.853682041Z"}`,
}
require.Len(t, k.Websocket.DataHandler, len(expJSON), "Must see correct number of trades")
for resp := range k.Websocket.DataHandler {
switch v := resp.(type) {
case trade.Data:
i := 1 - len(k.Websocket.DataHandler)
exp := trade.Data{Exchange: k.Name, CurrencyPair: spotTestPair}
require.NoErrorf(t, json.Unmarshal([]byte(expJSON[i]), &exp), "Must not error unmarshalling json %d: %s", i, expJSON[i])
require.Equalf(t, exp, v, "Trade [%d] should be correct", i)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
require.Equalf(t, exp, v, "Trade [%d] should be correct", i)
require.Equalf(t, exp, v, "Trade [%d] must be correct", i)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

case error:
t.Error(v)
default:
t.Errorf("Unexpected type in DataHandler: %T (%s)", v, v)
}
}
}

func TestWsOpenOrders(t *testing.T) {
t.Parallel()
k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
Expand Down
42 changes: 32 additions & 10 deletions exchanges/kraken/kraken_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func init() {

var (
authToken string
errParsingWSField = errors.New("error parsing WS field")
errCancellingOrder = errors.New("error cancelling order")
errSubPairMissing = errors.New("pair missing from subscription response")
errInvalidChecksum = errors.New("invalid checksum")
Expand Down Expand Up @@ -531,7 +530,9 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error {
if !ok {
return errors.New("received invalid trade data")
}
if !k.IsSaveTradeDataEnabled() {
saveTradeData := k.IsSaveTradeDataEnabled()
tradeFeed := k.IsTradeFeedEnabled()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOS: Just highlighting this as trade feed enabled is not necessary as it's a subscription customisation. No change needed.

if !saveTradeData && !tradeFeed {
return nil
}
trades := make([]trade.Data, len(data))
Expand All @@ -540,24 +541,37 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error {
if !ok {
return errors.New("unidentified trade data received")
}
timeData, err := strconv.ParseFloat(t[2].(string), 64)
if len(t) < 4 {
return fmt.Errorf("%w; unexpected trade data length: %d", common.ErrParsingWSField, len(t))
}
ts, ok := t[2].(string)
if !ok {
return common.GetTypeAssertError("string", t[2], "trade.time")
}
timeData, err := strconv.ParseFloat(ts, 64)
if err != nil {
return err
}

price, err := strconv.ParseFloat(t[0].(string), 64)
p, ok := t[0].(string)
if !ok {
return common.GetTypeAssertError("string", t[0], "trade.price")
}
price, err := strconv.ParseFloat(p, 64)
if err != nil {
return err
}

amount, err := strconv.ParseFloat(t[1].(string), 64)
v, ok := t[1].(string)
if !ok {
return common.GetTypeAssertError("string", t[1], "trade.volume")
}
amount, err := strconv.ParseFloat(v, 64)
if err != nil {
return err
}
var tSide = order.Buy
s, ok := t[3].(string)
if !ok {
return common.GetTypeAssertError("string", t[3], "side")
return common.GetTypeAssertError("string", t[3], "trade.side")
}
if s == "s" {
tSide = order.Sell
Expand All @@ -573,7 +587,15 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error {
Side: tSide,
}
}
return trade.AddTradesToBuffer(k.Name, trades...)
if tradeFeed {
for i := range trades {
k.Websocket.DataHandler <- trades[i]
}
}
if saveTradeData {
return trade.AddTradesToBuffer(k.Name, trades...)
}
return nil
}

// wsProcessOrderBook handles both partial and full orderbook updates
Expand Down Expand Up @@ -1331,7 +1353,7 @@ func (k *Kraken) wsCancelOrder(orderID string) error {

status, err := jsonparser.GetUnsafeString(resp, "status")
if err != nil {
return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, resp)
return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, resp)
} else if status == "ok" {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions exchanges/kraken/testdata/wsAllTrades.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[119930881,[["95873.80000","0.00051182","1740353380.379186","b","l",""]],"trade","XBT/USD"]
[119930881,[["95940.90000","0.00011069","1740362472.853682","s","l",""]],"trade","XBT/USD"]
4 changes: 3 additions & 1 deletion testdata/configtest.json
Original file line number Diff line number Diff line change
Expand Up @@ -1967,7 +1967,9 @@
},
"enabled": {
"autoPairUpdates": true,
"websocketAPI": true
"websocketAPI": true,
"saveTradeData": false,
"tradeFeed": true
}
},
"bankAccounts": [
Expand Down
Loading