Skip to content

Commit

Permalink
remove exchangeName for AddTradesToBuffer
Browse files Browse the repository at this point in the history
Signed-off-by: Ye Sijun <[email protected]>
  • Loading branch information
junnplus committed Mar 5, 2025
1 parent 7fa2592 commit 0032f4b
Show file tree
Hide file tree
Showing 23 changed files with 69 additions and 72 deletions.
2 changes: 1 addition & 1 deletion exchanges/binance/binance_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.
}

if b.IsSaveTradeDataEnabled() {
err := trade.AddTradesToBuffer(b.Name, resp...)
err := trade.AddTradesToBuffer(resp...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions exchanges/binanceus/binanceus_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ func (bi *Binanceus) UpdateOrderbook(ctx context.Context, pair currency.Pair, as

orderbookNew, err := bi.GetOrderBookDepth(ctx, &OrderBookDataRequestParams{
Symbol: pair,
Limit: 1000})
Limit: 1000,
})
if err != nil {
return book, err
}
Expand Down Expand Up @@ -450,7 +451,7 @@ func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, asset
}

if bi.IsSaveTradeDataEnabled() {
err := trade.AddTradesToBuffer(bi.Name, resp...)
err := trade.AddTradesToBuffer(resp...)
if err != nil {
return nil, err
}
Expand Down
24 changes: 14 additions & 10 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/log"
)

var (
errParsingWSField = errors.New("error parsing WS field")
)
var errParsingWSField = errors.New("error parsing WS field")

const (
authenticatedBitfinexWebsocketEndpoint = "wss://api.bitfinex.com/ws/2"
Expand Down Expand Up @@ -112,8 +110,10 @@ type checksum struct {
}

// checksumStore quick global for now
var checksumStore = make(map[int]*checksum)
var cMtx sync.Mutex
var (
checksumStore = make(map[int]*checksum)
cMtx sync.Mutex
)

var subscriptionNames = map[string]string{
subscription.TickerChannel: wsTickerChannel,
Expand Down Expand Up @@ -724,12 +724,14 @@ func (b *Bitfinex) handleWSBookUpdate(c *subscription.Subscription, d []interfac
ID: int64(id),
Period: int64(pricePeriod),
Price: rateAmount,
Amount: amount})
Amount: amount,
})
} else {
newOrderbook = append(newOrderbook, WebsocketBook{
ID: int64(id),
Price: pricePeriod,
Amount: rateAmount})
Amount: rateAmount,
})
}
}
if err := b.WsInsertSnapshot(c.Pairs[0], c.Asset, newOrderbook, fundingRate); err != nil {
Expand All @@ -756,12 +758,14 @@ func (b *Bitfinex) handleWSBookUpdate(c *subscription.Subscription, d []interfac
ID: int64(id),
Period: int64(pricePeriod),
Price: amountRate,
Amount: amount})
Amount: amount,
})
} else {
newOrderbook = append(newOrderbook, WebsocketBook{
ID: int64(id),
Price: pricePeriod,
Amount: amountRate})
Amount: amountRate,
})
}

if err := b.WsUpdateOrderbook(c, c.Pairs[0], c.Asset, newOrderbook, int64(sequenceNo), fundingRate); err != nil {
Expand Down Expand Up @@ -986,7 +990,7 @@ func (b *Bitfinex) handleWSAllTrades(s *subscription.Subscription, respRaw []byt
}
}
if b.IsSaveTradeDataEnabled() {
err = trade.AddTradesToBuffer(b.GetName(), trades...)
err = trade.AddTradesToBuffer(trades...)
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bitstamp/bitstamp_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (b *Bitstamp) handleWSTrade(msg []byte) error {
if wsTradeTemp.Data.Type == 1 {
side = order.Sell
}
return trade.AddTradesToBuffer(b.Name, trade.Data{
return trade.AddTradesToBuffer(trade.Data{
Timestamp: time.Unix(wsTradeTemp.Data.Timestamp, 0),
CurrencyPair: p,
AssetType: asset.Spot,
Expand Down
4 changes: 2 additions & 2 deletions exchanges/btcmarkets/btcmarkets_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (b *BTCMarkets) wsHandleData(respRaw []byte) error {
side = order.Sell
}

return trade.AddTradesToBuffer(b.Name, trade.Data{
return trade.AddTradesToBuffer(trade.Data{
Timestamp: t.Timestamp,
CurrencyPair: p,
AssetType: asset.Spot,
Expand Down Expand Up @@ -254,7 +254,7 @@ func (b *BTCMarkets) wsHandleData(respRaw []byte) error {
originalAmount := orderData.OpenVolume
var price float64
var trades []order.TradeHistory
var orderID = strconv.FormatInt(orderData.OrderID, 10)
orderID := strconv.FormatInt(orderData.OrderID, 10)
for x := range orderData.Trades {
var isMaker bool
if orderData.Trades[x].LiquidityType == "Maker" {
Expand Down
2 changes: 1 addition & 1 deletion exchanges/btse/btse_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (b *BTSE) wsHandleData(respRaw []byte) error {
TID: strconv.FormatInt(tradeHistory.Data[x].ID, 10),
})
}
return trade.AddTradesToBuffer(b.Name, trades...)
return trade.AddTradesToBuffer(trades...)
case strings.Contains(topic, "orderBookL2Api"): // TODO: Fix orderbook updates.
var t wsOrderBook
err = json.Unmarshal(respRaw, &t)
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bybit/bybit_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ func (by *Bybit) wsProcessPublicTrade(assetType asset.Item, resp *WebsocketRespo
TID: result[x].TradeID,
}
}
return trade.AddTradesToBuffer(by.Name, tradeDatas...)
return trade.AddTradesToBuffer(tradeDatas...)
}

func (by *Bybit) wsProcessOrderbook(assetType asset.Item, resp *WebsocketResponse) error {
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bybit/bybit_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func (by *Bybit) GetRecentTrades(ctx context.Context, p currency.Pair, assetType
}

if by.IsSaveTradeDataEnabled() {
err := trade.AddTradesToBuffer(by.Name, resp...)
err := trade.AddTradesToBuffer(resp...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/coinbasepro/coinbasepro_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error {
if !c.IsSaveTradeDataEnabled() {
return nil
}
return trade.AddTradesToBuffer(c.Name, trade.Data{
return trade.AddTradesToBuffer(trade.Data{
Timestamp: wsOrder.Time,
Exchange: c.Name,
CurrencyPair: p,
Expand Down
12 changes: 5 additions & 7 deletions exchanges/coinut/coinut_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ const (
coinutWebsocketRateLimit = 30
)

var (
channels map[string]chan []byte
)
var channels map[string]chan []byte

// NOTE for speed considerations
// wss://wsapi-as.coinut.com
Expand Down Expand Up @@ -310,7 +308,7 @@ func (c *COINUT) wsHandleData(_ context.Context, respRaw []byte) error {
TID: strconv.FormatInt(tradeSnap.Trades[i].TransID, 10),
})
}
return trade.AddTradesToBuffer(c.Name, trades...)
return trade.AddTradesToBuffer(trades...)
case "inst_trade_update":
if !c.IsSaveTradeDataEnabled() {
return nil
Expand Down Expand Up @@ -341,7 +339,7 @@ func (c *COINUT) wsHandleData(_ context.Context, respRaw []byte) error {
}
}

return trade.AddTradesToBuffer(c.Name, trade.Data{
return trade.AddTradesToBuffer(trade.Data{
Timestamp: time.Unix(0, tradeUpdate.Timestamp*1000),
CurrencyPair: p,
AssetType: asset.Spot,
Expand Down Expand Up @@ -389,7 +387,7 @@ func (c *COINUT) parseOrderContainer(oContainer *wsOrderContainer) (*order.Detai
var oSide order.Side
var oStatus order.Status
var err error
var orderID = strconv.FormatInt(oContainer.OrderID, 10)
orderID := strconv.FormatInt(oContainer.OrderID, 10)
if oContainer.Side != "" {
oSide, err = order.StringToOrderSide(oContainer.Side)
if err != nil {
Expand Down Expand Up @@ -582,7 +580,7 @@ func (c *COINUT) WsProcessOrderbookUpdate(update *WsOrderbookUpdate) error {

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (c *COINUT) GenerateDefaultSubscriptions() (subscription.List, error) {
var channels = []string{"inst_tick", "inst_order_book", "inst_trade"}
channels := []string{"inst_tick", "inst_order_book", "inst_trade"}
var subscriptions subscription.List
enabledPairs, err := c.GetEnabledPairs(asset.Spot)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions exchanges/deribit/deribit_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (d *Deribit) processUserOrderChanges(respRaw []byte, channels []string) err
AssetType: a,
}
}
err = trade.AddTradesToBuffer(d.Name, td...)
err = trade.AddTradesToBuffer(td...)
if err != nil {
return err
}
Expand Down Expand Up @@ -513,7 +513,7 @@ func (d *Deribit) processTrades(respRaw []byte, channels []string) error {
AssetType: a,
}
}
return trade.AddTradesToBuffer(d.Name, tradeDatas...)
return trade.AddTradesToBuffer(tradeDatas...)
}

func (d *Deribit) processIncrementalTicker(respRaw []byte, channels []string) error {
Expand Down
4 changes: 2 additions & 2 deletions exchanges/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ func (b *Base) AddTradesToBuffer(trades ...trade.Data) error {
if !b.IsSaveTradeDataEnabled() {
return nil
}
return trade.AddTradesToBuffer(b.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}

// IsSaveTradeDataEnabled checks the state of
Expand Down Expand Up @@ -1322,7 +1322,7 @@ func (e *Endpoints) GetURL(key URL) (string, error) {
// GetURLMap gets all urls for either running or default map based on the bool value supplied
func (e *Endpoints) GetURLMap() map[string]string {
e.mu.RLock()
var urlMap = make(map[string]string)
urlMap := make(map[string]string)
for k, v := range e.defaults {
urlMap[k] = v
}
Expand Down
4 changes: 2 additions & 2 deletions exchanges/gemini/gemini_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (g *Gemini) wsHandleData(respRaw []byte) error {
TID: strconv.FormatInt(result.EventID, 10),
}

return trade.AddTradesToBuffer(g.Name, tradeEvent)
return trade.AddTradesToBuffer(tradeEvent)
case "subscription_ack":
var result WsSubscriptionAcknowledgementResponse
err := json.Unmarshal(respRaw, &result)
Expand Down Expand Up @@ -563,7 +563,7 @@ func (g *Gemini) wsProcessUpdate(result *wsL2MarketData) error {
}
}

return trade.AddTradesToBuffer(g.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}

func channelName(s *subscription.Subscription) string {
Expand Down
7 changes: 2 additions & 5 deletions exchanges/hitbtc/hitbtc_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (h *HitBTC) wsHandleData(respRaw []byte) error {
TID: strconv.FormatInt(tradeSnapshot.Params.Data[i].ID, 10),
})
}
return trade.AddTradesToBuffer(h.Name, trades...)
return trade.AddTradesToBuffer(trades...)
case "activeOrders":
var o wsActiveOrdersResponse
err := json.Unmarshal(respRaw, &o)
Expand Down Expand Up @@ -292,10 +292,7 @@ func (h *HitBTC) wsHandleData(respRaw []byte) error {
return err
}
}
case
"replaced",
"canceled",
"new":
case "replaced", "canceled", "new":
var o wsOrderResponse
err := json.Unmarshal(respRaw, &o)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion exchanges/huobi/huobi_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (h *HUOBI) wsReadMsgs(s stream.Connection) {
}
}
}

func (h *HUOBI) wsHandleData(respRaw []byte) error {
if id, err := jsonparser.GetString(respRaw, "id"); err == nil {
if h.Websocket.Match.IncomingWithData(id, respRaw) {
Expand Down Expand Up @@ -255,7 +256,7 @@ func (h *HUOBI) wsHandleAllTradesMsg(s *subscription.Subscription, respRaw []byt
TID: strconv.FormatFloat(t.Tick.Data[i].TradeID, 'f', -1, 64),
})
}
return trade.AddTradesToBuffer(h.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}

func (h *HUOBI) wsHandleTickerMsg(s *subscription.Subscription, respRaw []byte) error {
Expand Down
5 changes: 2 additions & 3 deletions exchanges/kraken/kraken_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error {
if err != nil {
return err
}
var tSide = order.Buy
tSide := order.Buy
s, ok := t[3].(string)
if !ok {
return common.GetTypeAssertError("string", t[3], "trade.side")
Expand All @@ -593,7 +593,7 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error {
}
}
if saveTradeData {
return trade.AddTradesToBuffer(k.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}
return nil
}
Expand Down Expand Up @@ -1094,7 +1094,6 @@ func (k *Kraken) manageSubs(op string, subs subscription.List) error {

// Ignore an overall timeout, because we'll track individual subscriptions in handleSubResps
err = common.ExcludeError(err, stream.ErrSignatureTimeout)

if err != nil {
return fmt.Errorf("%w; Channel: %s Pair: %s", err, s.Channel, s.Pairs)
}
Expand Down
5 changes: 3 additions & 2 deletions exchanges/kucoin/kucoin_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ func (ku *Kucoin) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (
Total: accountH[x].Balance.Float64(),
Hold: accountH[x].Holds.Float64(),
Free: accountH[x].Available.Float64(),
}},
},
},
})
}
default:
Expand Down Expand Up @@ -588,7 +589,7 @@ func (ku *Kucoin) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp
return nil, fmt.Errorf("%w %v", asset.ErrNotSupported, assetType)
}
if ku.IsSaveTradeDataEnabled() {
err := trade.AddTradesToBuffer(ku.Name, resp...)
err := trade.AddTradesToBuffer(resp...)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 0032f4b

Please sign in to comment.