Skip to content

Commit

Permalink
Subscriptions: Fix no enabled pairs for one asset (#1792)
Browse files Browse the repository at this point in the history
If one asset is enabled but has no enabled pairs, we should ignore that
asset, even for non-pair related subscriptions.
That matches the existing code, but wasn't happening in the context of
asset.All subscriptions with just one asset in this state.
If a user wanted to have non-pair subscriptions, they should use
asset.Empty. Would expect other breakages with no pairs enabled, too.

Note: No enabled pairs for an enabled asset is a transient issue which
can occur due to enableOnePair racing against no available pairs. The
second run, available pairs would be populated and enableOnePair would
work. This situation could persist due to running with --dry or
containerised, though.

Fixes:
`
Okx websocket: subscription failure, myOrders all : subscription template did not generate the expected number of pair records for spread: Got 1;
Expected 0
`

Relates to #1420
  • Loading branch information
gbjk authored Mar 6, 2025
1 parent 0796e44 commit d069ff2
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 44 deletions.
30 changes: 17 additions & 13 deletions exchanges/subscription/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type mockEx struct {
pairs currency.Pairs
pairs assetPairs
assets asset.Items
tpl string
auth bool
Expand All @@ -23,29 +23,26 @@ type mockEx struct {
}

func newMockEx() *mockEx {
pairs := currency.Pairs{btcusdtPair, ethusdcPair}
for _, b := range []currency.Code{currency.LTC, currency.XRP, currency.TRX} {
for _, q := range []currency.Code{currency.USDT, currency.USDC} {
pairs = append(pairs, currency.NewPair(b, q))
}
}

return &mockEx{
assets: asset.Items{asset.Spot, asset.Futures, asset.Index},
pairs: pairs,
pairs: assetPairs{
asset.Spot: currency.Pairs{ethusdcPair, btcusdtPair, currency.NewPair(currency.LTC, currency.USDT)},
asset.Futures: currency.Pairs{ethusdcPair, btcusdtPair},
asset.Index: currency.Pairs{btcusdtPair},
},
}
}

func (m *mockEx) IsAssetWebsocketSupported(a asset.Item) bool {
return a != asset.Index
}

func (m *mockEx) GetEnabledPairs(_ asset.Item) (currency.Pairs, error) {
return m.pairs, m.errPairs
func (m *mockEx) GetEnabledPairs(a asset.Item) (currency.Pairs, error) {
return m.pairs[a], m.errPairs
}

func (m *mockEx) GetPairFormat(_ asset.Item, _ bool) (currency.PairFormat, error) {
return currency.PairFormat{Uppercase: true}, m.errFormat
return currency.PairFormat{Uppercase: true, Delimiter: "-"}, m.errFormat
}

func (m *mockEx) GetSubscriptionTemplate(s *Subscription) (*template.Template, error) {
Expand All @@ -65,7 +62,14 @@ func (m *mockEx) GetSubscriptionTemplate(s *Subscription) (*template.Template, e
ap[asset.Spot] = ap[asset.Spot][0:1]
return ""
},
"batch": common.Batch[currency.Pairs],
"batch": func(pairs currency.Pairs, size int) []string {
s := []string{}
for _, p := range common.Batch(pairs, size) {
p = p.Format(currency.PairFormat{Uppercase: true})
s = append(s, p.Join())
}
return s
},
}).
ParseFiles("testdata/" + m.tpl)
}
Expand Down
31 changes: 16 additions & 15 deletions exchanges/subscription/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,6 @@ func (l List) SetStates(state State) error {
return err
}

func fillAssetPairs(ap assetPairs, a asset.Item, e IExchange) error {
p, err := e.GetEnabledPairs(a)
if err != nil {
return err
}
f, err := e.GetPairFormat(a, true)
if err != nil {
return err
}
ap[a] = common.SortStrings(p.Format(f))
return nil
}

// assetPairs returns a map of enabled pairs for the subscriptions in the list, formatted for the asset
func (l List) assetPairs(e IExchange) (assetPairs, error) {
at := []asset.Item{}
Expand All @@ -122,13 +109,13 @@ func (l List) assetPairs(e IExchange) (assetPairs, error) {
// Nothing to do
case asset.All:
for _, a := range at {
if err := fillAssetPairs(ap, a, e); err != nil {
if err := ap.populate(e, a); err != nil {
return nil, err
}
}
default:
if slices.Contains(at, s.Asset) {
if err := fillAssetPairs(ap, s.Asset, e); err != nil {
if err := ap.populate(e, s.Asset); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -169,3 +156,17 @@ func (l List) Public() List {
}
return n
}

// populate adds all enabled pairs for an asset to the assetPair map
func (ap assetPairs) populate(e IExchange, a asset.Item) error {
p, err := e.GetEnabledPairs(a)
if err != nil || len(p) == 0 {
return err
}
f, err := e.GetPairFormat(a, true)
if err != nil {
return err
}
ap[a] = common.SortStrings(p.Format(f))
return nil
}
25 changes: 25 additions & 0 deletions exchanges/subscription/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@ func TestAssetPairs(t *testing.T) {
}
}

// TestAssetPairsPopulate exercises assetPairs Populate
func TestAssetPairsPopulate(t *testing.T) {
e := newMockEx()
ap := assetPairs{}
err := ap.populate(e, asset.Spot)
require.NoError(t, err)
require.NotEmpty(t, ap)
assert.Equal(t, 3, len(ap[asset.Spot]), "populate should return correct number of pairs for spot")
assert.Equal(t, "BTC-USDT", ap[asset.Spot][0].String(), "populate should respect format and sort the pairs")
err = ap.populate(e, asset.Futures)
require.NoError(t, err)
assert.Equal(t, 2, len(ap[asset.Futures]), "populate should return correct number of pairs for futures")

exp := errors.New("expected error")
e.errFormat = exp
err = ap.populate(e, asset.Spot)
assert.ErrorIs(t, err, exp, "populate should error correctly on format error")

e.pairs = assetPairs{}
ap = assetPairs{}
err = ap.populate(e, asset.Spot)
require.NoError(t, err, "populate must not error with no pairs enabled")
assert.Empty(t, ap, "populate should return an empty map with no pairs enabled")
}

func TestListClone(t *testing.T) {
t.Parallel()
l := List{{Channel: TickerChannel}, {Channel: OrderbookChannel}}
Expand Down
39 changes: 25 additions & 14 deletions exchanges/subscription/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,40 @@ func TestExpandTemplates(t *testing.T) {
{Channel: "expand-pairs", Asset: asset.Spot, Levels: 2},
{Channel: "single-channel", QualifiedChannel: "just one sub already processed"},
{Channel: "update-asset-pairs", Asset: asset.All},
{Channel: "expand-pairs", Asset: asset.Spot, Pairs: e.pairs[0:2], Levels: 3},
{Channel: "expand-pairs", Asset: asset.Spot, Pairs: e.pairs[asset.Spot][0:2], Levels: 3},
{Channel: "batching", Asset: asset.Spot},
{Channel: "single-channel", Authenticated: true},
}
got, err := l.ExpandTemplates(e)
require.NoError(t, err, "ExpandTemplates must not error")
exp := List{
{Channel: "single-channel", QualifiedChannel: "single-channel"},
{Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@15m", Asset: asset.Spot, Pairs: e.pairs, Interval: kline.FifteenMin},
{Channel: "expand-assets", QualifiedChannel: "future-expand-assets@15m", Asset: asset.Futures, Pairs: e.pairs, Interval: kline.FifteenMin},
{Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@15m", Asset: asset.Spot, Pairs: e.pairs[asset.Spot], Interval: kline.FifteenMin},
{Channel: "expand-assets", QualifiedChannel: "future-expand-assets@15m", Asset: asset.Futures, Pairs: e.pairs[asset.Futures], Interval: kline.FifteenMin},
{Channel: "single-channel", QualifiedChannel: "just one sub already processed"},
{Channel: "update-asset-pairs", QualifiedChannel: "spot-btcusdt-update-asset-pairs", Asset: asset.Spot, Pairs: currency.Pairs{btcusdtPair}},
{Channel: "expand-pairs", QualifiedChannel: "spot-USDTBTC-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[:1], Levels: 3},
{Channel: "expand-pairs", QualifiedChannel: "spot-USDCETH-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[1:2], Levels: 3},
{Channel: "expand-pairs", QualifiedChannel: "spot-USDTBTC-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[asset.Spot][1:2], Levels: 3},
{Channel: "expand-pairs", QualifiedChannel: "spot-USDCETH-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[asset.Spot][:1], Levels: 3},
}
for _, p := range e.pairs {
exp = append(exp, List{
{Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@1", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 1},
{Channel: "expand-pairs", QualifiedChannel: "future-" + p.Swap().String() + "-expand-pairs@1", Asset: asset.Futures, Pairs: currency.Pairs{p}, Levels: 1},
{Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@2", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 2},
}...)
for a, pairs := range e.pairs {
if a == asset.Index { // Not IsAssetWebsocketEnabled
continue
}
for _, p := range common.SortStrings(pairs) {
pStr := p.Swap().String()
if a == asset.Spot {
exp = append(exp, List{
{Channel: "expand-pairs", QualifiedChannel: "spot-" + pStr + "-expand-pairs@1", Asset: a, Pairs: currency.Pairs{p}, Levels: 1},
&Subscription{Channel: "expand-pairs", QualifiedChannel: "spot-" + pStr + "-expand-pairs@2", Asset: a, Pairs: currency.Pairs{p}, Levels: 2},
}...)
} else {
exp = append(exp,
&Subscription{Channel: "expand-pairs", QualifiedChannel: "future-" + pStr + "-expand-pairs@1", Asset: a, Pairs: currency.Pairs{p}, Levels: 1},
)
}
}
}
for _, b := range common.Batch(common.SortStrings(e.pairs), 3) {
for _, b := range common.Batch(common.SortStrings(e.pairs[asset.Spot]), 3) {
exp = append(exp, &Subscription{Channel: "batching", QualifiedChannel: "spot-" + b.Join() + "-batching", Asset: asset.Spot, Pairs: b})
}

Expand All @@ -73,9 +84,9 @@ func TestExpandTemplates(t *testing.T) {
got, err = l.ExpandTemplates(e)
require.NoError(t, err, "ExpandTemplates must not error")
exp = List{
{Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@1h", Asset: asset.Spot, Pairs: e.pairs, Interval: kline.OneHour},
{Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@1h", Asset: asset.Spot, Pairs: e.pairs[asset.Spot], Interval: kline.OneHour},
}
for _, p := range e.pairs {
for _, p := range e.pairs[asset.Spot] {
exp = append(exp, List{
{Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@4", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 4},
}...)
Expand Down
1 change: 0 additions & 1 deletion exchanges/subscription/testdata/errors.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
{{/* Incorrect number of pair entries */}}
{{- .PairSeparator -}}
{{- .PairSeparator -}}
{{- .PairSeparator -}}
{{- else if eq .S.Channel "error4" }}
{{/* Too many BatchSize commands */}}
{{- range $asset, $pairs := $.AssetPairs }}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/subscription/testdata/subscriptions.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
{{- range $asset, $pairs := $.AssetPairs }}
{{- if eq $asset.String "spot" }}
{{- range $batch := batch $pairs 3 -}}
{{ assetName $asset }}-{{ $batch.Join -}} -batching
{{ assetName $asset }}-{{ $batch -}} -batching
{{- $.PairSeparator -}}
{{- end -}}
{{- $.BatchSize -}} 3
Expand Down

0 comments on commit d069ff2

Please sign in to comment.