From 6a12553d2e0034f6d141345bb0ecd413cfc2650b Mon Sep 17 00:00:00 2001 From: buck54321 Date: Mon, 30 Oct 2023 16:54:56 -0700 Subject: [PATCH] apidata: store completed candles in db (#2443) * cache api candles --- client/asset/dcr/spv.go | 9 +- client/orderbook/orderbook.go | 5 +- dex/candles/candles.go | 26 +++- dex/networks/zec/block.go | 5 +- dex/utils/generics.go | 10 ++ server/apidata/apidata.go | 42 +++++- server/apidata/apidata_test.go | 8 ++ server/db/driver/pg/candles_online_test.go | 60 +++++++++ server/db/driver/pg/epochs.go | 142 ++++++++++++++++++++- server/db/driver/pg/internal/epochs.go | 31 +++++ server/db/driver/pg/markets.go | 8 ++ server/db/driver/pg/tables.go | 19 +++ server/db/interface.go | 2 + server/dex/dex.go | 2 +- 14 files changed, 343 insertions(+), 26 deletions(-) create mode 100644 dex/utils/generics.go create mode 100644 server/db/driver/pg/candles_online_test.go diff --git a/client/asset/dcr/spv.go b/client/asset/dcr/spv.go index ef14d58e48..68b09b1450 100644 --- a/client/asset/dcr/spv.go +++ b/client/asset/dcr/spv.go @@ -21,6 +21,7 @@ import ( "decred.org/dcrdex/client/asset" "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/utils" "decred.org/dcrwallet/v3/chain" walleterrors "decred.org/dcrwallet/v3/errors" "decred.org/dcrwallet/v3/p2p" @@ -1050,7 +1051,7 @@ func (w *spvWallet) ticketsInRange(ctx context.Context, lowerHeight, upperHeight // If this is a mempool scan, we cannot scan backwards, so reverse the // result order. if includeMempool { - reverseSlice(tickets) + utils.ReverseSlice(tickets) } return tickets, nil @@ -1419,12 +1420,6 @@ func initLogging(netDir string) error { return nil } -func reverseSlice[T any](s []T) { - for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { - s[i], s[j] = s[j], s[i] - } -} - func ticketSummaryToAssetTicket(ticketSummary *wallet.TicketSummary, hdr *wire.BlockHeader, log dex.Logger) *asset.Ticket { spender := "" if ticketSummary.Spender != nil { diff --git a/client/orderbook/orderbook.go b/client/orderbook/orderbook.go index b575969359..2d4ef3f8e6 100644 --- a/client/orderbook/orderbook.go +++ b/client/orderbook/orderbook.go @@ -12,6 +12,7 @@ import ( "decred.org/dcrdex/dex" "decred.org/dcrdex/dex/msgjson" "decred.org/dcrdex/dex/order" + "decred.org/dcrdex/dex/utils" ) // ErrEmptyOrderbook is returned from MidGap when the order book is empty. @@ -690,9 +691,7 @@ func (ob *OrderBook) AddRecentMatches(matches [][2]int64, ts uint64) []*MatchSum } // Put the newest first. - for i, j := 0, len(newMatches)-1; i < j; i, j = i+1, j-1 { - newMatches[i], newMatches[j] = newMatches[j], newMatches[i] - } + utils.ReverseSlice(newMatches) ob.matchSummaryMtx.Lock() defer ob.matchSummaryMtx.Unlock() diff --git a/dex/candles/candles.go b/dex/candles/candles.go index c52edbf9b0..00e1621179 100644 --- a/dex/candles/candles.go +++ b/dex/candles/candles.go @@ -7,6 +7,7 @@ import ( "time" "decred.org/dcrdex/dex/msgjson" + "decred.org/dcrdex/dex/utils" ) const ( @@ -68,7 +69,7 @@ func (c *Cache) Add(candle *Candle) { return } c.Candles = append(c.Candles, *candle) - c.cursor = sz // len(c.candles) - 1 + c.cursor = sz // len(c.Candles) - 1 } func (c *Cache) Reset() { @@ -181,6 +182,29 @@ func (c *Cache) Last() *Candle { return &c.Candles[c.cursor] } +// CompletedCandlesSince returns any candles that fall into an epoch after the +// epoch of the provided timestamp, and before the current epoch. +func (c *Cache) CompletedCandlesSince(lastStoredEndStamp uint64) (cs []*Candle) { + currentIdx := uint64(time.Now().UnixMilli()) / c.BinSize + lastStoredIdx := lastStoredEndStamp / c.BinSize + + sz := len(c.Candles) + for i := 0; i < sz; i++ { + // iterate backwards + candle := &c.Candles[(c.cursor+sz-i)%sz] + epochIdx := candle.EndStamp / c.BinSize + if epochIdx >= currentIdx { + continue + } + if epochIdx <= lastStoredIdx { + break + } + cs = append(cs, candle) + } + utils.ReverseSlice(cs) + return +} + // combineCandles attempts to add the candidate candle to the target candle // in-place, if they're in the same bin, otherwise returns false. func (c *Cache) combineCandles(target, candidate *Candle) bool { diff --git a/dex/networks/zec/block.go b/dex/networks/zec/block.go index 67038ed17e..bbdea363b0 100644 --- a/dex/networks/zec/block.go +++ b/dex/networks/zec/block.go @@ -9,6 +9,7 @@ import ( "io" "time" + "decred.org/dcrdex/dex/utils" "github.com/btcsuite/btcd/wire" ) @@ -119,8 +120,6 @@ func readInternalByteOrder(r io.Reader, b []byte) error { return err } // Reverse the bytes - for i, j := 0, len(b)-1; i < j; i, j = i+1, j-1 { - b[i], b[j] = b[j], b[i] - } + utils.ReverseSlice(b) return nil } diff --git a/dex/utils/generics.go b/dex/utils/generics.go new file mode 100644 index 0000000000..e3e0d0091b --- /dev/null +++ b/dex/utils/generics.go @@ -0,0 +1,10 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package utils + +func ReverseSlice[T any](s []T) { + for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { + s[i], s[j] = s[j], s[i] + } +} diff --git a/server/apidata/apidata.go b/server/apidata/apidata.go index fcfa0875c9..e00a84ad06 100644 --- a/server/apidata/apidata.go +++ b/server/apidata/apidata.go @@ -27,6 +27,8 @@ var ( // caches at startup. type DBSource interface { LoadEpochStats(base, quote uint32, caches []*candles.Cache) error + LastCandleEndStamp(base, quote uint32, candleDur uint64) (uint64, error) + InsertCandles(base, quote uint32, dur uint64, cs []*candles.Candle) error } // MarketSource is a source of market information. Markets are added after @@ -43,6 +45,11 @@ type BookSource interface { Book(mktName string) (*msgjson.OrderBook, error) } +type cacheWithStoredTime struct { + *candles.Cache + lastStoredEndStamp uint64 // protected by DataAPI.cacheMtx +} + // DataAPI is a data API backend. type DataAPI struct { db DBSource @@ -53,7 +60,7 @@ type DataAPI struct { spots map[string]json.RawMessage cacheMtx sync.RWMutex - marketCaches map[string]map[uint64]*candles.Cache + marketCaches map[string]map[uint64]*cacheWithStoredTime } // NewDataAPI is the constructor for a new DataAPI. @@ -62,7 +69,7 @@ func NewDataAPI(dbSrc DBSource) *DataAPI { db: dbSrc, epochDurations: make(map[string]uint64), spots: make(map[string]json.RawMessage), - marketCaches: make(map[string]map[uint64]*candles.Cache), + marketCaches: make(map[string]map[uint64]*cacheWithStoredTime), } if atomic.CompareAndSwapUint32(&started, 0, 1) { @@ -81,18 +88,25 @@ func (s *DataAPI) AddMarketSource(mkt MarketSource) error { } epochDur := mkt.EpochDuration() s.epochDurations[mktName] = epochDur - binCaches := make(map[uint64]*candles.Cache, len(binSizes)+1) - s.marketCaches[mktName] = binCaches + binCaches := make(map[uint64]*cacheWithStoredTime, len(binSizes)+1) cacheList := make([]*candles.Cache, 0, len(binSizes)+1) for _, binSize := range append([]uint64{epochDur}, binSizes...) { cache := candles.NewCache(candles.CacheSize, binSize) + lastCandleEndStamp, err := s.db.LastCandleEndStamp(mkt.Base(), mkt.Quote(), cache.BinSize) + if err != nil { + return fmt.Errorf("LastCandleEndStamp: %w", err) + } + c := &cacheWithStoredTime{cache, lastCandleEndStamp} cacheList = append(cacheList, cache) - binCaches[binSize] = cache + binCaches[binSize] = c } err = s.db.LoadEpochStats(mkt.Base(), mkt.Quote(), cacheList) if err != nil { return err } + s.cacheMtx.Lock() + s.marketCaches[mktName] = binCaches + s.cacheMtx.Unlock() return nil } @@ -120,7 +134,7 @@ func (s *DataAPI) ReportEpoch(base, quote uint32, epochIdx uint64, stats *matche epochDur := s.epochDurations[mktName] startStamp := epochIdx * epochDur endStamp := startStamp + epochDur - var cache5min *candles.Cache + var cache5min *cacheWithStoredTime const fiveMins = uint64(time.Minute * 5 / time.Millisecond) candle := &candles.Candle{ StartStamp: startStamp, @@ -137,6 +151,21 @@ func (s *DataAPI) ReportEpoch(base, quote uint32, epochIdx uint64, stats *matche cache5min = cache } cache.Add(candle) + + // Check if any candles need to be inserted. + // Don't insert epoch candles. + if cache.BinSize == epochDur { + continue + } + + newCandles := cache.CompletedCandlesSince(cache.lastStoredEndStamp) + if len(newCandles) == 0 { + continue + } + if err := s.db.InsertCandles(base, quote, cache.BinSize, newCandles); err != nil { + return 0, 0, 0, 0, fmt.Errorf("InsertCandles: %w", err) + } + cache.lastStoredEndStamp = newCandles[len(newCandles)-1].EndStamp } if cache5min == nil { return 0, 0, 0, 0, fmt.Errorf("no 5 minute cache") @@ -161,6 +190,7 @@ func (s *DataAPI) ReportEpoch(base, quote uint32, epochIdx uint64, stats *matche High24: high24, Low24: low24, } + s.spotsMtx.Lock() s.spots[mktName], err = json.Marshal(spot) s.spotsMtx.Unlock() diff --git a/server/apidata/apidata_test.go b/server/apidata/apidata_test.go index bdd5adbb14..aaa05ff0a3 100644 --- a/server/apidata/apidata_test.go +++ b/server/apidata/apidata_test.go @@ -32,6 +32,14 @@ func (db *TDBSource) LoadEpochStats(base, quote uint32, caches []*candles.Cache) return db.loadEpochErr } +func (db *TDBSource) LastCandleEndStamp(base, quote uint32, candleDur uint64) (uint64, error) { + return 0, nil +} + +func (db *TDBSource) InsertCandles(base, quote uint32, dur uint64, cs []*candles.Candle) error { + return nil +} + type TBookSource struct { book *msgjson.OrderBook } diff --git a/server/db/driver/pg/candles_online_test.go b/server/db/driver/pg/candles_online_test.go new file mode 100644 index 0000000000..431c788934 --- /dev/null +++ b/server/db/driver/pg/candles_online_test.go @@ -0,0 +1,60 @@ +//go:build pgonline + +package pg + +import ( + "testing" + + "decred.org/dcrdex/dex/candles" +) + +func TestCandles(t *testing.T) { + if err := cleanTables(archie.db); err != nil { + t.Fatalf("cleanTables: %v", err) + } + + var baseID, quoteID uint32 = 42, 0 + var candleDur uint64 = 5 * 60 * 1000 + + lastCandle, err := archie.LastCandleEndStamp(baseID, quoteID, candleDur) + if err != nil { + t.Fatalf("Initial LastCandleEndStamp error: %v", err) + } + + cands := []*candles.Candle{ + {EndStamp: candleDur}, + {EndStamp: candleDur * 2}, + } + + if err = archie.InsertCandles(baseID, quoteID, candleDur, cands); err != nil { + t.Fatalf("InsertCandles error: %v", err) + } + + lastCandle, err = archie.LastCandleEndStamp(baseID, quoteID, candleDur) + if err != nil { + t.Fatalf("LastCandleEndStamp error: %v", err) + } + + if lastCandle != candleDur*2 { + t.Fatalf("Wrong last candle. Wanted 2, got %d", lastCandle) + } + + // Updating is fine + cands[1].MatchVolume = 1 + if err = archie.InsertCandles(baseID, quoteID, candleDur, []*candles.Candle{cands[1]}); err != nil { + t.Fatalf("InsertCandles (overwrite) error: %v", err) + } + + cache := candles.NewCache(5, candleDur) + if err = archie.LoadEpochStats(baseID, quoteID, []*candles.Cache{cache}); err != nil { + t.Fatalf("LoadEpochStats error: %v", err) + } + + if len(cache.Candles) != 2 { + t.Fatalf("Expected 2 candles, got %d", len(cache.Candles)) + } + + if cache.Last().MatchVolume != 1 { + t.Fatalf("Overwrite failed") + } +} diff --git a/server/db/driver/pg/epochs.go b/server/db/driver/pg/epochs.go index fb003f87a0..066bfb960d 100644 --- a/server/db/driver/pg/epochs.go +++ b/server/db/driver/pg/epochs.go @@ -9,6 +9,7 @@ import ( "database/sql/driver" "errors" "fmt" + "math" "time" "decred.org/dcrdex/dex/candles" @@ -112,13 +113,39 @@ func (a *Archiver) LoadEpochStats(base, quote uint32, caches []*candles.Cache) e ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout) defer cancel() + // First. load stored candles from the candles table. Establish a start + // stamp for scanning epoch reports for partial candles. + var oldestNeeded uint64 = math.MaxUint64 + sinceCaches := make(map[uint64]*candles.Cache, 0) // maps oldest end stamp + now := uint64(time.Now().UnixMilli()) + for _, cache := range caches { + if err = a.loadCandles(base, quote, cache, candles.CacheSize); err != nil { + return fmt.Errorf("loadCandles: %w", err) + } + + var since uint64 + if len(cache.Candles) > 0 { + // If we have candles, set our since value to the next expected + // epoch stamp. + idx := cache.Last().EndStamp / cache.BinSize + since = (idx + 1) * cache.BinSize + } else { + since = now - (cache.BinSize * candles.CacheSize) + since = since - since%cache.BinSize // truncate to first end stamp of the epoch + } + if since < oldestNeeded { + oldestNeeded = since + } + sinceCaches[since] = cache + } + tstart := time.Now() defer func() { log.Debugf("select epoch candles in: %v", time.Since(tstart)) }() stmt := fmt.Sprintf(internal.SelectEpochCandles, epochReportsTableName) - rows, err := a.db.QueryContext(ctx, stmt, 0) + rows, err := a.db.QueryContext(ctx, stmt, oldestNeeded) // +1 because candles aren't stored until the end stamp is surpassed. if err != nil { - return err + return fmt.Errorf("SelectEpochCandles: %w", err) } defer rows.Close() @@ -127,7 +154,7 @@ func (a *Archiver) LoadEpochStats(base, quote uint32, caches []*candles.Cache) e for rows.Next() { err = rows.Scan(&endStamp, &epochDur, &matchVol, "eVol, &highRate, &lowRate, &startRate, &endRate) if err != nil { - return err + return fmt.Errorf("Scan: %w", err) } candle := &candles.Candle{ StartStamp: uint64(endStamp - epochDur), @@ -139,10 +166,115 @@ func (a *Archiver) LoadEpochStats(base, quote uint32, caches []*candles.Cache) e StartRate: uint64(startRate), EndRate: uint64(endRate), } - for _, set := range caches { - set.Add(candle) + for since, cache := range sinceCaches { + if uint64(endStamp) > since { + cache.Add(candle) + } } } return rows.Err() } + +// LastCandleEndStamp pulls the last stored candles end stamp for a market and +// candle duration. +func (a *Archiver) LastCandleEndStamp(base, quote uint32, candleDur uint64) (uint64, error) { + marketSchema, err := a.marketSchema(base, quote) + if err != nil { + return 0, err + } + + tableName := fullCandlesTableName(a.dbName, marketSchema, candleDur) + + ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout) + defer cancel() + + stmt := fmt.Sprintf(internal.SelectLastEndStamp, tableName) + row := a.db.QueryRowContext(ctx, stmt) + var endStamp fastUint64 + if err = row.Scan(&endStamp); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } + return 0, err + } + return uint64(endStamp), nil +} + +// InsertCandles inserts new candles for a market and candle duration. +func (a *Archiver) InsertCandles(base, quote uint32, candleDur uint64, cs []*candles.Candle) error { + marketSchema, err := a.marketSchema(base, quote) + if err != nil { + return err + } + tableName := fullCandlesTableName(a.dbName, marketSchema, candleDur) + stmt := fmt.Sprintf(internal.InsertCandle, tableName) + + insert := func(c *candles.Candle) error { + ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout) + defer cancel() + + _, err = a.db.ExecContext(ctx, stmt, + c.EndStamp, c.MatchVolume, c.QuoteVolume, c.HighRate, c.LowRate, c.StartRate, c.EndRate, + ) + if err != nil { + a.fatalBackendErr(err) + return err + } + return nil + } + + for _, c := range cs { + if err = insert(c); err != nil { + return err + } + } + return nil +} + +// loadCandles loads the last n candles of a specified duration and market into +// the provided cache. +func (a *Archiver) loadCandles(base, quote uint32, cache *candles.Cache, n uint64) error { + marketSchema, err := a.marketSchema(base, quote) + if err != nil { + return err + } + + candleDur := cache.BinSize + + tableName := fullCandlesTableName(a.dbName, marketSchema, candleDur) + stmt := fmt.Sprintf(internal.SelectCandles, tableName) + + ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout) + defer cancel() + + rows, err := a.db.QueryContext(ctx, stmt, n) + if err != nil { + return fmt.Errorf("QueryContext: %w", err) + } + defer rows.Close() + + var endStamp, matchVol, quoteVol, highRate, lowRate, startRate, endRate fastUint64 + for rows.Next() { + err = rows.Scan(&endStamp, &matchVol, "eVol, &highRate, &lowRate, &startRate, &endRate) + if err != nil { + return fmt.Errorf("Scan: %w", err) + } + cache.Add(&candles.Candle{ + StartStamp: uint64(endStamp) - candleDur, + EndStamp: uint64(endStamp), + MatchVolume: uint64(matchVol), + QuoteVolume: uint64(quoteVol), + HighRate: uint64(highRate), + LowRate: uint64(lowRate), + StartRate: uint64(startRate), + EndRate: uint64(endRate), + }) + } + + if err = rows.Err(); err != nil { + return err + } + + return nil +} diff --git a/server/db/driver/pg/internal/epochs.go b/server/db/driver/pg/internal/epochs.go index 92f74bca7b..bae298a1ce 100644 --- a/server/db/driver/pg/internal/epochs.go +++ b/server/db/driver/pg/internal/epochs.go @@ -61,4 +61,35 @@ const ( FROM %s WHERE epoch_end >= $1 ORDER BY epoch_end;` + + // CreateEpochReportTable creates an candles table that holds binned + // candle data. + CreateCandlesTable = `CREATE TABLE IF NOT EXISTS %s ( + end_stamp INT8 PRIMARY KEY, + match_volume INT8, + quote_volume INT8, + high_rate INT8, + low_rate INT8, + start_rate INT8, + end_rate INT8 + );` + + InsertCandle = `INSERT INTO %s ( + end_stamp, match_volume, quote_volume, high_rate, low_rate, start_rate, end_rate + ) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (end_stamp) DO UPDATE + SET match_volume = $2, quote_volume = $3, high_rate = $4, low_rate = $5, start_rate = $6, end_rate = $7;` + + SelectCandles = `SELECT end_stamp, match_volume, quote_volume, + high_rate, low_rate, start_rate, end_rate + FROM %s + ORDER BY end_stamp + LIMIT $1;` + + SelectLastEndStamp = `SELECT (end_stamp) + FROM %s + ORDER BY end_stamp + DESC + LIMIT 1;` ) diff --git a/server/db/driver/pg/markets.go b/server/db/driver/pg/markets.go index 6430ba6cb9..e67b7b5c60 100644 --- a/server/db/driver/pg/markets.go +++ b/server/db/driver/pg/markets.go @@ -9,6 +9,7 @@ import ( "strings" "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/candles" "decred.org/dcrdex/server/db/driver/pg/internal" ) @@ -83,6 +84,13 @@ func createMarketTables(db *sql.DB, marketName string) error { } } + // Create tables for the candles. + for _, binSize := range append(candles.BinSizes, "epoch") { + if _, err := createTableStmt(db, internal.CreateCandlesTable, marketUID, candlesTableName+"_"+binSize); err != nil { + return err + } + } + return nil } diff --git a/server/db/driver/pg/tables.go b/server/db/driver/pg/tables.go index 3c7fe0ed46..9d5d6c139e 100644 --- a/server/db/driver/pg/tables.go +++ b/server/db/driver/pg/tables.go @@ -32,6 +32,7 @@ const ( cancelsArchivedTableName = "cancels_archived" cancelsActiveTableName = "cancels_active" epochReportsTableName = "epoch_reports" + candlesTableName = "candles" ) type tableStmt struct { @@ -120,6 +121,24 @@ func fullEpochReportsTableName(dbName, marketSchema string) string { return dbName + "." + marketSchema + "." + epochReportsTableName } +func fullCandlesTableName(dbName, marketSchema string, candleDur uint64) string { + const fiveMin = 5 * 60 * 1000 + const oneHour = 60 * 60 * 1000 + const aDay = 24 * oneHour + var binSize string + switch candleDur { + case fiveMin: + binSize = "5m" + case oneHour: + binSize = "1h" + case aDay: + binSize = "24h" + default: + binSize = "epoch" + } + return dbName + "." + marketSchema + "." + candlesTableName + "_" + binSize +} + // createTable creates one of the known tables by name. The table will be // created in the specified schema (schema.tableName). If schema is empty, // "public" is used. diff --git a/server/db/interface.go b/server/db/interface.go index c3a3153e74..135c3ca972 100644 --- a/server/db/interface.go +++ b/server/db/interface.go @@ -85,6 +85,8 @@ type DEXArchivist interface { // LoadEpochStats reads all market epoch history from the database. LoadEpochStats(uint32, uint32, []*candles.Cache) error + LastCandleEndStamp(base, quote uint32, candleDur uint64) (uint64, error) + InsertCandles(base, quote uint32, dur uint64, cs []*candles.Candle) error OrderArchiver AccountArchiver diff --git a/server/dex/dex.go b/server/dex/dex.go index 852a97514e..e2c24c650e 100644 --- a/server/dex/dex.go +++ b/server/dex/dex.go @@ -800,7 +800,7 @@ func NewDEX(ctx context.Context, cfg *DexConf) (*DEX, error) { log.Infof("Preparing historical market data API for market %v...", mktInf.Name) err = dataAPI.AddMarketSource(mkt) if err != nil { - return nil, fmt.Errorf("DataSource.AddMarket: %w", err) + return nil, fmt.Errorf("DataSource.AddMarketSource: %w", err) } // Having loaded the book, get the accounts owning the orders.