Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Huobi: Add subscription configuration #1604

Merged
merged 6 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions cmd/documentation/exchanges_templates/huobi.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,24 @@ if err != nil {
}
```

### How to do Websocket public/private calls
### Subscriptions

```go
// Exchanges will be abstracted out in further updates and examples will be
// supplied then
All subscriptions are for spot only.

Default Public Subscriptions:
- Ticker
- Candles ( Interval: 1min )
- Orderbook ( Level: 0 - No aggregation )
- Configure Level: 1-5 for depth aggregation, for example:
```json
{"enabled": true, "channel": "orderbook", "asset": "spot", "levels": 1}
```
- Trades

Default Authenticated Subscriptions:
- Account Trades
- Account Orders
- Account Updates

### Please click GoDocs chevron above to view current GoDoc information for this package
{{template "contributions"}}
Expand Down
20 changes: 16 additions & 4 deletions exchanges/huobi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,24 @@ if err != nil {
}
```

### How to do Websocket public/private calls
### Subscriptions

```go
// Exchanges will be abstracted out in further updates and examples will be
// supplied then
All subscriptions are for spot only.

Default Public Subscriptions:
- Ticker
- Candles ( Interval: 1min )
- Orderbook ( Level: 0 - No aggregation )
- Configure Level: 1-5 for depth aggregation, for example:
```json
{"enabled": true, "channel": "orderbook", "asset": "spot", "levels": 1}
```
- Trades

Default Authenticated Subscriptions:
- Account Trades
- Account Orders
- Account Updates

### Please click GoDocs chevron above to view current GoDoc information for this package

Expand Down
64 changes: 64 additions & 0 deletions exchanges/huobi/huobi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

Expand Down Expand Up @@ -2927,3 +2929,65 @@ func TestGetCurrencyTradeURL(t *testing.T) {
}
}
}

func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()

h := new(HUOBI)
require.NoError(t, testexch.Setup(h), "Test instance Setup must not error")
subs, err := h.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
exp := subscription.List{}
for _, s := range h.Features.Subscriptions {
if s.Authenticated && !h.Websocket.CanUseAuthenticatedEndpoints() {
continue
}
for _, a := range h.GetAssetTypes(true) {
if s.Asset != asset.All && s.Asset != a {
continue
}
pairs, err := h.GetEnabledPairs(a)
require.NoErrorf(t, err, "GetEnabledPairs %s must not error", a)
pairs = common.SortStrings(pairs).Format(currency.PairFormat{Uppercase: false, Delimiter: ""})
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
s.Asset = a
for i, p := range pairs {
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
s.QualifiedChannel = channelName(s, p)
switch s.Channel {
case subscription.OrderbookChannel:
s.QualifiedChannel += ".step0"
case subscription.CandlesChannel:
s.QualifiedChannel += ".1min"
}
s.Pairs = pairs[i : i+1]
exp = append(exp, s)
}
}
}
testsubs.EqualLists(t, exp, subs)
}

// TestSubscribe exercises live public subscriptions
func TestSubscribe(t *testing.T) {
t.Parallel()
h := new(HUOBI)
require.NoError(t, testexch.Setup(h), "Test instance Setup must not error")
subs, err := h.Features.Subscriptions.ExpandTemplates(h)
require.NoError(t, err, "ExpandTemplates must not error")
testexch.SetupWs(t, h)
err = h.Subscribe(subs)
require.NoError(t, err, "Subscribe must not error")
got := h.Websocket.GetSubscriptions()
require.Equal(t, 4, len(got), "Must get correct number of subscriptions")
for _, s := range got {
assert.Equal(t, subscription.SubscribedState, s.State())
}
}

func TestChannelName(t *testing.T) {
p := currency.NewPair(currency.BTC, currency.USD)
assert.Equal(t, "market.BTCUSD.kline", channelName(&subscription.Subscription{Channel: subscription.CandlesChannel}, p))
assert.Panics(t, func() { channelName(&subscription.Subscription{Channel: wsOrderbookChannel}, p) })
assert.Panics(t, func() { channelName(&subscription.Subscription{Channel: subscription.MyAccountChannel}, p) }, "Should panic on V2 endpoints until implemented")
}
175 changes: 97 additions & 78 deletions exchanges/huobi/huobi_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strconv"
"strings"
"text/template"
"time"

"github.com/gorilla/websocket"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
Expand All @@ -31,11 +33,13 @@ const (
baseWSURL = "wss://api.huobi.pro"
futuresWSURL = "wss://api.hbdm.com/"

wsMarketURL = baseWSURL + "/ws"
wsMarketKline = "market.%s.kline.1min"
wsMarketDepth = "market.%s.depth.step0"
wsMarketTrade = "market.%s.trade.detail"
wsMarketTicker = "market.%s.detail"
wsMarketURL = baseWSURL + "/ws"
wsCandlesChannel = "market.%s.kline"
wsOrderbookChannel = "market.%s.depth"
wsTradesChannel = "market.%s.trade.detail"
wsMarketDetailChannel = "market.%s.detail"
wsMyOrdersChannel = "orders.%s"
wsMyTradesChannel = "orders.%s.update"

wsAccountsOrdersEndPoint = "/ws/v1"
wsAccountsList = "accounts.list"
Expand All @@ -56,6 +60,28 @@ const (
loginDelay = 50 * time.Millisecond
)

var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Levels: 0}, // Aggregation Levels; 0 is no depth aggregation
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.MyOrdersChannel, Authenticated: true},
{Enabled: true, Asset: asset.Spot, Channel: subscription.MyTradesChannel, Authenticated: true},
{Enabled: true, Channel: subscription.MyAccountChannel, Authenticated: true},
}

var subscriptionNames = map[string]string{
subscription.TickerChannel: wsMarketDetailChannel,
subscription.CandlesChannel: wsCandlesChannel,
subscription.OrderbookChannel: wsOrderbookChannel,
subscription.AllTradesChannel: wsTradesChannel,
/* TODO: Pending upcoming V2 support, these are dropped from the translation table so that the sub conf will be correct and not need upgrading, but will error on usage
subscription.MyTradesChannel: wsMyOrdersChannel,
subscription.MyOrdersChannel: wsMyTradesChannel,
subscription.MyAccountChannel: wsMyAccountChannel,
*/
}

// Instantiates a communications channel between websocket connections
var comms = make(chan WsMessage)

Expand Down Expand Up @@ -514,101 +540,66 @@ func (h *HUOBI) WsProcessOrderbook(update *WsDepth, symbol string) error {
return h.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
}

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (h *HUOBI) GenerateDefaultSubscriptions() (subscription.List, error) {
var channels = []string{wsMarketKline,
wsMarketDepth,
wsMarketTrade,
wsMarketTicker}
var subscriptions subscription.List
if h.Websocket.CanUseAuthenticatedEndpoints() {
channels = append(channels, "orders.%v", "orders.%v.update")
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: "accounts",
})
}
enabledCurrencies, err := h.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
for i := range channels {
for j := range enabledCurrencies {
enabledCurrencies[j].Delimiter = ""
channel := fmt.Sprintf(channels[i],
enabledCurrencies[j].Lower().String())
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channel,
Pairs: currency.Pairs{enabledCurrencies[j]},
})
}
}
return subscriptions, nil
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (h *HUOBI) generateSubscriptions() (subscription.List, error) {
return h.Features.Subscriptions.ExpandTemplates(h)
}

// GetSubscriptionTemplate returns a subscription channel template
func (h *HUOBI) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{
"channelName": channelName,
"interval": h.FormatExchangeKlineInterval,
}).Parse(subTplText)
}

// Subscribe sends a websocket message to receive data from the channel
func (h *HUOBI) Subscribe(channelsToSubscribe subscription.List) error {
func (h *HUOBI) Subscribe(subs subscription.List) error {
ctx := context.Background()
var errs error
var creds *account.Credentials
if h.Websocket.CanUseAuthenticatedEndpoints() {
var err error
creds, err = h.GetCredentials(context.TODO())
if err != nil {
return err
if len(subs.Private()) > 0 {
if creds, errs = h.GetCredentials(ctx); errs != nil {
return errs
}
}
var errs error
for i := range channelsToSubscribe {
for _, s := range subs {
var err error
if (strings.Contains(channelsToSubscribe[i].Channel, "orders.") ||
strings.Contains(channelsToSubscribe[i].Channel, "accounts")) && creds != nil {
err = h.wsAuthenticatedSubscribe(creds,
"sub",
wsAccountsOrdersEndPoint+channelsToSubscribe[i].Channel,
channelsToSubscribe[i].Channel)
if s.Authenticated {
if err = h.wsAuthenticatedSubscribe(creds, "sub", wsAccountsOrdersEndPoint+"/"+s.QualifiedChannel, s.QualifiedChannel); err == nil {
err = h.Websocket.AddSuccessfulSubscriptions(h.Websocket.Conn, s)
}
} else {
err = h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, WsRequest{
Subscribe: channelsToSubscribe[i].Channel,
})
}
if err == nil {
err = h.Websocket.AddSuccessfulSubscriptions(h.Websocket.Conn, channelsToSubscribe[i])
}
if err != nil {
errs = common.AppendError(errs, err)
if err = h.Websocket.Conn.SendJSONMessage(ctx, request.Unset, WsRequest{Subscribe: s.QualifiedChannel}); err == nil {
err = h.Websocket.AddSuccessfulSubscriptions(h.Websocket.AuthConn, s)
}
}
errs = common.AppendError(errs, err)
}
return nil
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
func (h *HUOBI) Unsubscribe(channelsToUnsubscribe subscription.List) error {
func (h *HUOBI) Unsubscribe(subs subscription.List) error {
ctx := context.Background()
var errs error
var creds *account.Credentials
if h.Websocket.CanUseAuthenticatedEndpoints() {
var err error
creds, err = h.GetCredentials(context.TODO())
if err != nil {
return err
if len(subs.Private()) > 0 {
if creds, errs = h.GetCredentials(ctx); errs != nil {
return errs
}
}
var errs error
for i := range channelsToUnsubscribe {
for _, s := range subs {
var err error
if (strings.Contains(channelsToUnsubscribe[i].Channel, "orders.") ||
strings.Contains(channelsToUnsubscribe[i].Channel, "accounts")) && creds != nil {
err = h.wsAuthenticatedSubscribe(creds,
"unsub",
wsAccountsOrdersEndPoint+channelsToUnsubscribe[i].Channel,
channelsToUnsubscribe[i].Channel)
if s.Authenticated {
err = h.wsAuthenticatedSubscribe(creds, "unsub", wsAccountsOrdersEndPoint+"/"+s.QualifiedChannel, s.QualifiedChannel)
} else {
err = h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, WsRequest{
Unsubscribe: channelsToUnsubscribe[i].Channel,
})
err = h.Websocket.Conn.SendJSONMessage(ctx, request.Unset, WsRequest{Unsubscribe: s.QualifiedChannel})
}
if err == nil {
err = h.Websocket.RemoveSubscriptions(h.Websocket.Conn, channelsToUnsubscribe[i])
}
if err != nil {
errs = common.AppendError(errs, err)
err = h.Websocket.RemoveSubscriptions(h.Websocket.Conn, s)
}
errs = common.AppendError(errs, err)
}
return errs
}
Expand Down Expand Up @@ -810,3 +801,31 @@ func (h *HUOBI) wsGetOrderDetails(ctx context.Context, orderID string) (*WsAuthe
}
return &response, nil
}

// channelName converts global channel Names used in config of channel input into exchange channel names
// returns the name unchanged if no match is found
func channelName(s *subscription.Subscription, p currency.Pair) string {
if n, ok := subscriptionNames[s.Channel]; ok {
return fmt.Sprintf(n, p)
}
if s.Authenticated {
panic(fmt.Errorf("%w: Private endpoints not currently supported", common.ErrNotYetImplemented))
}
panic(subscription.ErrUseConstChannelName)
}

const subTplText = `
{{- if $.S.Asset }}
{{ range $asset, $pairs := $.AssetPairs }}
{{- range $p := $pairs }}
{{- channelName $.S $p -}}
{{- if eq $.S.Channel "candles" -}} . {{- interval $.S.Interval }}{{ end }}
{{- if eq $.S.Channel "orderbook" -}} .step {{- $.S.Levels }}{{ end }}
{{ $.PairSeparator }}
{{- end }}
{{ $.AssetSeparator }}
{{- end }}
{{- else -}}
{{ channelName $.S nil }}
{{- end }}
`
3 changes: 2 additions & 1 deletion exchanges/huobi/huobi_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (h *HUOBI) SetDefaults() {
GlobalResultLimit: 2000,
},
},
Subscriptions: defaultSubscriptions.Clone(),
}

h.Requester, err = request.New(h.Name,
Expand Down Expand Up @@ -213,7 +214,7 @@ func (h *HUOBI) Setup(exch *config.Exchange) error {
Connector: h.WsConnect,
Subscriber: h.Subscribe,
Unsubscriber: h.Unsubscribe,
GenerateSubscriptions: h.GenerateDefaultSubscriptions,
GenerateSubscriptions: h.generateSubscriptions,
Features: &h.Features.Supports.WebsocketCapabilities,
})
if err != nil {
Expand Down
Loading
Loading