Skip to content
This repository has been archived by the owner on Oct 15, 2024. It is now read-only.

[R4R]add option to enable range query for tx indexer;add option to disable websocket #115

Merged
merged 4 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## v0.31.5-binance.2
*Sep 6th, 2019*
### FEATURES:
- [config] [\#115](https://github.com/binance-chain/bnc-tendermint/pull/115) add option to enable range query for tx indexer;add option to disable websocket
- [sync] [\#97](https://github.com/binance-chain/bnc-tendermint/pull/97) supoort hot sync reactor

### IMPROVEMENTS:
Expand Down
15 changes: 12 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ type RPCConfig struct {
// 1024 - 40 - 10 - 50 = 924 = ~900
MaxOpenConnections int `mapstructure:"max_open_connections"`

// Websocket handler will be disabled if set true
DisableWebsocket bool `mapstructure:"disable_websocket"`

// Maximum number of go routine to process websocket request.
// 1 - process websocket request synchronously.
// 10 - default size.
Expand Down Expand Up @@ -428,6 +431,7 @@ func DefaultRPCConfig() *RPCConfig {
GRPCMaxOpenConnections: 900,

Unsafe: false,
DisableWebsocket: false,
MaxOpenConnections: 900,
WebsocketPoolMaxSize: 10,
WebsocketPoolQueueSize: 10,
Expand Down Expand Up @@ -975,6 +979,10 @@ type TxIndexConfig struct {
// 2) "kv" (default) - the simplest possible indexer, backed by key-value storage (defaults to levelDB; see DBBackend).
Indexer string `mapstructure:"indexer"`

// Operator ["<", ">", ">=", "<="] belongs to range query operator.
// Notice: only enable it in trust environment.
EnableRangeQuery bool `mapstructure:"enable_range_query"`

// Comma-separated list of tags to index (by default the only tag is "tx.hash")
//
// You can also index transactions by height by adding "tx.height" tag here.
Expand Down Expand Up @@ -1008,9 +1016,10 @@ type BlockIndexConfig struct {
// DefaultTxIndexConfig returns a default configuration for the transaction indexer.
func DefaultTxIndexConfig() *TxIndexConfig {
return &TxIndexConfig{
Indexer: "kv",
IndexTags: "",
IndexAllTags: false,
Indexer: "kv",
EnableRangeQuery: false,
IndexTags: "",
IndexAllTags: false,
}
}

Expand Down
6 changes: 6 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ unsafe = {{ .RPC.Unsafe }}
# 1024 - 40 - 10 - 50 = 924 = ~900
max_open_connections = {{ .RPC.MaxOpenConnections }}

# Websocket handler will be disabled if set true
disable_websocket = {{ .RPC.DisableWebsocket }}

# Maximum number of go routine to process websocket request.
# 1 - process websocket request synchronously.
Expand Down Expand Up @@ -387,6 +389,10 @@ blocktime_iota = "{{ .Consensus.BlockTimeIota }}"
# 2) "kv" (default) - the simplest possible indexer, backed by key-value storage (defaults to levelDB; see DBBackend).
indexer = "{{ .TxIndex.Indexer }}"

# Operator ["<", ">", ">=", "<="] belongs to range query operator.
# Notice: only enable it in trust environment.
enable_range_query = {{ .TxIndex.EnableRangeQuery }}

# Comma-separated list of tags to index (by default the only tag is "tx.hash")
#
# You can also index transactions by height by adding "tx.height" tag here.
Expand Down
1 change: 0 additions & 1 deletion libs/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func NewDB(name string, backend DBBackendType, dir string) DB {
return NewDBWithOpt(name, backend, dir, nil)
}


// NewDB creates a new database of type backend with the given name.
// NOTE: function panics if:
// - backend is unknown (not registered)
Expand Down
19 changes: 19 additions & 0 deletions libs/pubsub/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,25 @@ const (
OpContains
)

func (o Operator) String() string {
switch o {
case OpLessEqual:
return "<="
case OpGreaterEqual:
return ">="
case OpLess:
return "<"
case OpGreater:
return ">"
case OpEqual:
return "="
case OpContains:
return "CONTAINS"
default:
return "UNKNOWN"
}
}

const (
// DateLayout defines a layout for all dates (`DATE date`)
DateLayout = "2006-01-02"
Expand Down
17 changes: 12 additions & 5 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,16 @@ func NewNode(config *cfg.Config,
if err != nil {
return nil, err
}
indexOptions := make([]func(index *kv.TxIndex), 0)
if config.TxIndex.IndexTags != "" {
txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
indexOptions = append(indexOptions, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
} else if config.TxIndex.IndexAllTags {
txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
} else {
txIndexer = kv.NewTxIndex(store)
indexOptions = append(indexOptions, kv.IndexAllTags())
}
if config.TxIndex.EnableRangeQuery {
indexOptions = append(indexOptions, kv.EnableRangeQuery())
}
txIndexer = kv.NewTxIndex(store, indexOptions...)
default:
txIndexer = &null.TxIndex{}
}
Expand Down Expand Up @@ -783,7 +786,11 @@ func (n *Node) startRPC() ([]net.Listener, error) {
}
}), rpcserver.SetWorkerPool(wsWorkerPool))
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
if n.config.RPC.DisableWebsocket {
mux.HandleFunc("/websocket", wm.WebsocketDisabledHandler)
} else {
mux.HandleFunc("/websocket", wm.WebsocketHandler)
}
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)

config := rpcserver.DefaultConfig()
Expand Down
2 changes: 2 additions & 0 deletions rpc/core/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
// SubscribeTimeout is the maximum time we wait to subscribe for an event.
// must be less than the server's write timeout (see rpcserver.DefaultConfig)
SubscribeTimeout = 5 * time.Second

MaxTxSearchQueryLength = 1024
)

//----------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions rpc/core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error
// - `hash`: `[]byte` - hash of the transaction
func TxSearch(ctx *rpctypes.Context, query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) {
// if index is disabled, return error
if len(query) > MaxTxSearchQueryLength {
return nil, fmt.Errorf("query length %d exceed the max lenght %d", len(query), MaxTxSearchQueryLength)
}
if _, ok := txIndexer.(*null.TxIndex); ok {
return nil, fmt.Errorf("Transaction indexing is disabled")
}
Expand Down
5 changes: 5 additions & 0 deletions rpc/lib/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,11 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
}
}

func (wm *WebsocketManager) WebsocketDisabledHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("websocket disabled"))
}

// rpc.websocket
//-----------------------------------------------------------------------------

Expand Down
2 changes: 2 additions & 0 deletions rpc/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ func StopTendermint(node *nm.Node) {
func NewTendermint(app abci.Application) *nm.Node {
// Create & start node
config := GetConfig()
// change default config for test
config.TxIndex.EnableRangeQuery = true
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
logger = log.NewFilter(logger, log.AllowError())
pvKeyFile := config.PrivValidatorKeyFile()
Expand Down
3 changes: 1 addition & 2 deletions state/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package state
import (
"sync"

abci "github.com/tendermint/tendermint/abci/types"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/types"
abci "github.com/tendermint/tendermint/abci/types"

)

const (
Expand Down
2 changes: 1 addition & 1 deletion state/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestCountDown(t *testing.T) {
indexDb := db.NewMemDB()

// start tx index
txIndexer := kv.NewTxIndex(indexDb, kv.IndexAllTags())
txIndexer := kv.NewTxIndex(indexDb, kv.IndexAllTags(), kv.EnableRangeQuery())
txIndexSvc := txindex.NewIndexerService(txIndexer, eventBus)
txIndexSvc.SetLogger(log.TestingLogger())
err = txIndexSvc.Start()
Expand Down
2 changes: 1 addition & 1 deletion state/txindex/indexer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {

// tx indexer
store := db.NewMemDB()
txIndexer := kv.NewTxIndex(store, kv.IndexAllTags())
txIndexer := kv.NewTxIndex(store, kv.IndexAllTags(), kv.EnableRangeQuery())

service := txindex.NewIndexerService(txIndexer, eventBus)
service.SetLogger(log.TestingLogger())
Expand Down
18 changes: 15 additions & 3 deletions state/txindex/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ var _ txindex.TxIndexer = (*TxIndex)(nil)

// TxIndex is the simplest possible indexer, backed by key-value storage (levelDB).
type TxIndex struct {
store dbm.DB
tagsToIndex []string
indexAllTags bool
store dbm.DB
tagsToIndex []string
indexAllTags bool
enableRangeQuery bool
}

// NewTxIndex creates new KV indexer.
Expand All @@ -54,6 +55,13 @@ func IndexAllTags() func(*TxIndex) {
}
}

// EnableRangeQuery is an option for enable range query.
func EnableRangeQuery() func(*TxIndex) {
return func(txi *TxIndex) {
txi.enableRangeQuery = true
}
}

// Get gets transaction from the TxIndex storage and returns it or nil if the
// transaction is not found.
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
Expand Down Expand Up @@ -169,6 +177,10 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
// if both upper and lower bounds exist, it's better to get them in order not
Copy link
Contributor

@rickyyangz rickyyangz Sep 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on second thought, we may check strings.ContainsAny(s, "><") before we parse the query conditions.

// no iterate over kvs that are not within range.
ranges, rangeIndexes := lookForRanges(conditions)
if !txi.enableRangeQuery && len(rangeIndexes) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to find the operator, just print the query

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

return nil, fmt.Errorf("range query is not supported by this node, detected invalid operators['>', '<', '<=', '>='] in the query statement")
}

if len(ranges) > 0 {
skipIndexes = append(skipIndexes, rangeIndexes...)

Expand Down
17 changes: 13 additions & 4 deletions state/txindex/kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestTxIndex(t *testing.T) {

func TestTxSearch(t *testing.T) {
allowedTags := []string{"account.number", "account.owner", "account.date"}
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags))
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags), EnableRangeQuery())

txResult := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.number"), Value: []byte("1")},
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestTxSearch(t *testing.T) {

func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
allowedTags := []string{"account.number"}
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags))
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags), EnableRangeQuery())

txResult := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.number"), Value: []byte("1")},
Expand All @@ -129,7 +129,7 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {

func TestTxSearchMultipleTxs(t *testing.T) {
allowedTags := []string{"account.number", "account.number.id"}
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags))
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags), EnableRangeQuery())

// indexed first, but bigger height (to test the order of transactions)
txResult := txResultWithTags([]cmn.KVPair{
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestTxSearchMultipleTxs(t *testing.T) {
}

func TestIndexAllTags(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB(), IndexAllTags())
indexer := NewTxIndex(db.NewMemDB(), IndexAllTags(), EnableRangeQuery())

txResult := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.owner"), Value: []byte("Ivan")},
Expand All @@ -202,6 +202,15 @@ func TestIndexAllTags(t *testing.T) {
assert.Equal(t, []*types.TxResult{txResult}, results)
}

func TestDisableRangeQuery(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB(), IndexAllTags())

_, err := indexer.Search(query.MustParse("account.number >= 1"))
assert.Error(t, err)
_, err = indexer.Search(query.MustParse("account.number >= 1 AND account.sequence < 100 AND tx.height > 200 AND tx.height <= 300"))
assert.Error(t, err)
}

func txResultWithTags(tags []cmn.KVPair) *types.TxResult {
tx := types.Tx("HELLO WORLD")
return &types.TxResult{
Expand Down