Skip to content

Commit

Permalink
Merge pull request ethereum#591 from thanhnguyennguyen/sdkNode-ignore…
Browse files Browse the repository at this point in the history
…-matching-engine

Sdk nodes ignore matching engine
  • Loading branch information
ngtuna authored Jul 29, 2019
2 parents 56610b5 + dd83537 commit a1f9df1
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 131 deletions.
47 changes: 7 additions & 40 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package core
import (
"encoding/json"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/posv"
Expand All @@ -29,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/tomox"
sdktypes "github.com/tomochain/tomox-sdk/types"
)

// BlockValidator is responsible for validating block headers, uncles and
Expand Down Expand Up @@ -151,6 +148,13 @@ func (v *BlockValidator) validateMatchedOrder(tomoXService *tomox.TomoX, current
if err != nil {
return common.Hash{},fmt.Errorf("transaction match is corrupted. Failed decode order. Error: %s ", err.Error())
}

// SDK node doesn't need to run ME
if tomoXService.IsSDKNode() {
log.Debug("SDK node ignore running matching engine")
return order.Hash, nil
}

if err := order.VerifyMatchedOrder(currentState); err != nil {
return common.Hash{},err
}
Expand All @@ -177,10 +181,6 @@ func (v *BlockValidator) validateMatchedOrder(tomoXService *tomox.TomoX, current
return common.Hash{}, err
}

trades := txMatch.GetTrades()
if err := logTrades(tomoXService, tx.Hash(), order, trades); err != nil {
return common.Hash{}, err
}
return order.Hash, nil
}

Expand Down Expand Up @@ -215,36 +215,3 @@ func CalcGasLimit(parent *types.Block) uint64 {
return limit
}

func logTrades(tomoXService *tomox.TomoX, txHash common.Hash, order *tomox.OrderItem, trades []map[string]string) error {
log.Debug("Got trades", "number", len(trades), "trades", trades)
for _, trade := range trades {
tradeSDK := &sdktypes.Trade{}
if q, ok := trade["quantity"]; ok {
tradeSDK.Amount = new(big.Int)
tradeSDK.Amount.SetString(q, 10)
}
tradeSDK.PricePoint = order.Price
tradeSDK.PairName = order.PairName
tradeSDK.BaseToken = order.BaseToken
tradeSDK.QuoteToken = order.QuoteToken
tradeSDK.Status = sdktypes.TradeStatusSuccess
tradeSDK.Maker = order.UserAddress
tradeSDK.MakerOrderHash = order.Hash
if u, ok := trade["uAddr"]; ok {
tradeSDK.Taker = common.Address{}
tradeSDK.Taker.SetString(u)
}
tradeSDK.TakerOrderHash = order.Hash //FIXME: will update txMatch to include TakerOrderHash = headOrder.Item.Hash
tradeSDK.TxHash = txHash
tradeSDK.Hash = tradeSDK.ComputeHash()
log.Debug("TRADE history", "order", order, "trade", tradeSDK)
// put tradeSDK to mongodb on SDK node
if tomoXService.IsSDKNode() {
db := tomoXService.GetDB()
if err := db.Put(tomox.EmptyKey(), tradeSDK, true); err != nil {
return fmt.Errorf("failed to store tradeSDK %s", err.Error())
}
}
}
return nil
}
27 changes: 26 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package core

import (
"encoding/json"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/tomox"
"io"
"math/big"
"os"
Expand Down Expand Up @@ -1203,6 +1205,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
if err = tomoXService.ApplyTxMatches(matchedOrderHashes); err != nil {
return i, events, coalescedLogs, err
}
if tomoXService.IsSDKNode() {
if err := logDataToSdkNode(tomoXService, block.Transactions()); err != nil {
return i, events, coalescedLogs, err
}
}
}

if bc.CurrentHeader().Number.Uint64()%common.TomoXSnapshotInterval == 0 {
Expand Down Expand Up @@ -1430,8 +1437,12 @@ func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.L
if err = tomoXService.ApplyTxMatches(matchedOrderHashes); err != nil {
return events, coalescedLogs, err
}
if tomoXService.IsSDKNode() {
if err := logDataToSdkNode(tomoXService, block.Transactions()); err != nil {
return events, coalescedLogs, err
}
}
}

if bc.CurrentHeader().Number.Uint64()%common.TomoXSnapshotInterval == 0 {
if err := tomoXService.Snapshot(block.Hash()); err != nil {
log.Error("Failed to snapshot tomox", "err", err)
Expand Down Expand Up @@ -1928,3 +1939,17 @@ func (bc *BlockChain) UpdateM1() error {
return nil
}

func logDataToSdkNode(tomoXService *tomox.TomoX, transactions types.Transactions) error {
for _, tx := range transactions {
if tx.IsMatchingTransaction() {
txMatch := &tomox.TxDataMatch{}
if err := json.Unmarshal(tx.Data(), txMatch); err != nil {
return err
}
if err := tomoXService.SyncDataToSDKNode(txMatch, tx.Hash()); err != nil {
return err
}
}
}
return nil
}
57 changes: 12 additions & 45 deletions tomox/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package tomox
import (
"bytes"
"encoding/hex"
"time"
"errors"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/tomochain/tomox-sdk/types"
"github.com/hashicorp/golang-lru"
"github.com/tomochain/tomox-sdk/types"
"time"
)

type MongoItem struct {
Expand Down Expand Up @@ -52,10 +50,10 @@ func NewMongoDatabase(session *mgo.Session, mongoURL string, cacheLimit int) (*M
dryRunCache, _ := lru.New(itemCacheLimit)

db := &MongoDatabase{
Session: session,
dbName: dbName,
cacheItems: cacheItems,
dryRunCache: dryRunCache,
Session: session,
dbName: dbName,
cacheItems: cacheItems,
dryRunCache: dryRunCache,
}

return db, nil
Expand Down Expand Up @@ -133,7 +131,6 @@ func (db *MongoDatabase) Get(key []byte, val interface{}, dryrun bool) (interfac
return value, nil
}
}

if cached, ok := db.cacheItems.Get(cacheKey); ok && !dryrun {
return cached, nil
} else {
Expand Down Expand Up @@ -175,7 +172,7 @@ func (db *MongoDatabase) Put(key []byte, val interface{}, dryrun bool) error {
return nil
}

log.Debug("Debug DB put", "cacheKey", cacheKey, "val", val)
log.Debug("Debug DB put", "cacheKey", cacheKey, "val", val)
db.cacheItems.Add(cacheKey, val)

switch val.(type) {
Expand Down Expand Up @@ -243,43 +240,13 @@ func (db *MongoDatabase) Delete(key []byte, dryrun bool) error {
}

func (db *MongoDatabase) InitDryRunMode() {
log.Debug("Start dry-run mode, clear old data")
db.dryRunCache.Purge()
// SDK node (which running with mongodb) doesn't run Matching engine
// dry-run cache is useless for sdk node
}

//TODO: should use batch commit to avoid data inconsistency
func (db *MongoDatabase) SaveDryRunResult() error {

sc := db.Session.Copy()
defer sc.Close()

for _, cacheKey := range db.dryRunCache.Keys() {
key, err := hex.DecodeString(cacheKey.(string))
if err != nil {
log.Error("Can't save dry-run result (hex.DecodeString)", "err", err)
return err
}
val, ok := db.dryRunCache.Get(cacheKey)
if !ok {
err := errors.New("can't get item from dryrun cache")
log.Error("Can't save dry-run result (db.dryRunCache.Get)", "err", err)
return err
}
if val == nil {
//TODO: don't remove order item in mongo
if err := db.Delete(key,false); err != nil {
log.Error("Can't save dry-run result (db.Delete)", "err", err)
return err
}
continue
}
if err := db.Put(key, val, false); err != nil {
log.Error("Can't save dry-run result (db.Put)", "err", err)
return err
}
}
// purge cache data
db.dryRunCache.Purge()
// SDK node (which running with mongodb) doesn't run Matching engine
// dry-run cache is useless for sdk node
return nil
}

Expand All @@ -300,7 +267,7 @@ func (db *MongoDatabase) CommitOrder(cacheKey string, o *OrderItem) error {
return err
}

log.Debug("Save", "cacheKey", cacheKey, "value", ToJSON(o))
log.Debug("Save orderItem", "cacheKey", cacheKey, "value", ToJSON(o))

return nil
}
Expand Down
48 changes: 28 additions & 20 deletions tomox/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,11 @@ func (orderBook *OrderBook) WorstAsk(dryrun bool) (value *big.Int) {
}

// processMarketOrder : process the market order
func (orderBook *OrderBook) processMarketOrder(order *OrderItem, verbose bool, dryrun bool) ([]map[string]string, error) {
func (orderBook *OrderBook) processMarketOrder(order *OrderItem, verbose bool, dryrun bool) ([]map[string]string, *OrderItem, error) {
var (
trades []map[string]string
newTrades []map[string]string
orderInBook *OrderItem
err error
)
quantityToTrade := order.Quantity
Expand All @@ -215,23 +216,23 @@ func (orderBook *OrderBook) processMarketOrder(order *OrderItem, verbose bool, d
if side == Bid {
for quantityToTrade.Cmp(zero) > 0 && orderBook.Asks.NotEmpty() {
bestPriceAsks := orderBook.Asks.MinPriceList(dryrun)
quantityToTrade, newTrades, err = orderBook.processOrderList(Ask, bestPriceAsks, quantityToTrade, order, verbose, dryrun)
quantityToTrade, newTrades, orderInBook, err = orderBook.processOrderList(Ask, bestPriceAsks, quantityToTrade, order, verbose, dryrun)
if err != nil {
return nil, err
return nil, orderInBook, err
}
trades = append(trades, newTrades...)
}
} else {
for quantityToTrade.Cmp(zero) > 0 && orderBook.Bids.NotEmpty() {
bestPriceBids := orderBook.Bids.MaxPriceList(dryrun)
quantityToTrade, newTrades, err = orderBook.processOrderList(Bid, bestPriceBids, quantityToTrade, order, verbose, dryrun)
quantityToTrade, newTrades, orderInBook, err = orderBook.processOrderList(Bid, bestPriceBids, quantityToTrade, order, verbose, dryrun)
if err != nil {
return nil, err
return nil, orderInBook, err
}
trades = append(trades, newTrades...)
}
}
return trades, nil
return trades, orderInBook, nil
}

// processLimitOrder : process the limit order, can change the quote
Expand All @@ -254,7 +255,7 @@ func (orderBook *OrderBook) processLimitOrder(order *OrderItem, verbose bool, dr
minPrice := orderBook.Asks.MinPrice(dryrun)
for quantityToTrade.Cmp(zero) > 0 && orderBook.Asks.NotEmpty() && price.Cmp(minPrice) >= 0 {
bestPriceAsks := orderBook.Asks.MinPriceList(dryrun)
quantityToTrade, newTrades, err = orderBook.processOrderList(Ask, bestPriceAsks, quantityToTrade, order, verbose, dryrun)
quantityToTrade, newTrades, orderInBook, err = orderBook.processOrderList(Ask, bestPriceAsks, quantityToTrade, order, verbose, dryrun)
if err != nil {
return nil, nil, err
}
Expand All @@ -276,7 +277,7 @@ func (orderBook *OrderBook) processLimitOrder(order *OrderItem, verbose bool, dr
maxPrice := orderBook.Bids.MaxPrice(dryrun)
for quantityToTrade.Cmp(zero) > 0 && orderBook.Bids.NotEmpty() && price.Cmp(maxPrice) <= 0 {
bestPriceBids := orderBook.Bids.MaxPriceList(dryrun)
quantityToTrade, newTrades, err = orderBook.processOrderList(Bid, bestPriceBids, quantityToTrade, order, verbose, dryrun)
quantityToTrade, newTrades, orderInBook, err = orderBook.processOrderList(Bid, bestPriceBids, quantityToTrade, order, verbose, dryrun)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -310,7 +311,7 @@ func (orderBook *OrderBook) ProcessOrder(order *OrderItem, verbose bool, dryrun
orderBook.Item.NextOrderID++

if orderType == Market {
trades, err = orderBook.processMarketOrder(order, verbose, dryrun)
trades, orderInBook, err = orderBook.processMarketOrder(order, verbose, dryrun)
if err != nil {
return nil, nil, err
}
Expand All @@ -330,40 +331,46 @@ func (orderBook *OrderBook) ProcessOrder(order *OrderItem, verbose bool, dryrun
}

// processOrderList : process the order list
func (orderBook *OrderBook) processOrderList(side string, orderList *OrderList, quantityStillToTrade *big.Int, order *OrderItem, verbose bool, dryrun bool) (*big.Int, []map[string]string, error) {
func (orderBook *OrderBook) processOrderList(side string, orderList *OrderList, quantityStillToTrade *big.Int, order *OrderItem, verbose bool, dryrun bool) (*big.Int, []map[string]string, *OrderItem, error) {
quantityToTrade := CloneBigInt(quantityStillToTrade)
var trades []map[string]string
var (
trades []map[string]string
orderInBook *OrderItem
)
// speedup the comparison, do not assign because it is pointer
zero := Zero()
for orderList.Item.Length > uint64(0) && quantityToTrade.Cmp(zero) > 0 {

headOrder := orderList.GetOrder(orderList.Item.HeadOrder, dryrun)
if headOrder == nil {
return nil, nil, fmt.Errorf("headOrder is null")
return nil, nil, nil, fmt.Errorf("headOrder is null")
}

tradedPrice := CloneBigInt(headOrder.Item.Price)

var newBookQuantity *big.Int
var tradedQuantity *big.Int
var (
newBookQuantity *big.Int
tradedQuantity *big.Int
)

if IsStrictlySmallerThan(quantityToTrade, headOrder.Item.Quantity) {
tradedQuantity = CloneBigInt(quantityToTrade)
// Do the transaction
newBookQuantity = Sub(headOrder.Item.Quantity, quantityToTrade)
if err := headOrder.UpdateQuantity(orderList, newBookQuantity, headOrder.Item.UpdatedAt, dryrun); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
quantityToTrade = Zero()
orderInBook = headOrder.Item
} else if IsEqual(quantityToTrade, headOrder.Item.Quantity) {
tradedQuantity = CloneBigInt(quantityToTrade)
if side == Bid {
if err := orderBook.Bids.RemoveOrderFromOrderList(headOrder, orderList, dryrun); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
} else {
if err := orderBook.Asks.RemoveOrderFromOrderList(headOrder, orderList, dryrun); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}
quantityToTrade = Zero()
Expand All @@ -372,11 +379,11 @@ func (orderBook *OrderBook) processOrderList(side string, orderList *OrderList,
tradedQuantity = CloneBigInt(headOrder.Item.Quantity)
if side == Bid {
if err := orderBook.Bids.RemoveOrderFromOrderList(headOrder, orderList, dryrun); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
} else {
if err := orderBook.Asks.RemoveOrderFromOrderList(headOrder, orderList, dryrun); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}
quantityToTrade = Sub(quantityToTrade, tradedQuantity)
Expand All @@ -387,6 +394,7 @@ func (orderBook *OrderBook) processOrderList(side string, orderList *OrderList,
}

transactionRecord := make(map[string]string)
transactionRecord["orderHash"] = hex.EncodeToString(headOrder.Item.Hash.Bytes())
transactionRecord["timestamp"] = strconv.FormatUint(orderBook.Timestamp, 10)
transactionRecord["quantity"] = tradedQuantity.String()
transactionRecord["exAddr"] = headOrder.Item.ExchangeAddress.String()
Expand All @@ -396,7 +404,7 @@ func (orderBook *OrderBook) processOrderList(side string, orderList *OrderList,

trades = append(trades, transactionRecord)
}
return quantityToTrade, trades, nil
return quantityToTrade, trades, orderInBook, nil
}

// CancelOrder : cancel the order, just need ID, side and price, of course order must belong
Expand Down
Loading

0 comments on commit a1f9df1

Please sign in to comment.