Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add refetch blocks and transactions fix command #248

Merged
merged 57 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
b629fd2
Register NewTxsCmd
MonikaCat Oct 20, 2021
c21bf20
Add txs.go
MonikaCat Oct 20, 2021
b71403f
register blocks fix
MonikaCat Oct 22, 2021
3244813
Added block fix
MonikaCat Oct 22, 2021
f6d9e50
Update insert
MonikaCat Oct 22, 2021
6ee8d84
Updated db saving method
MonikaCat Oct 25, 2021
91b20a6
Updated block refetching
MonikaCat Oct 25, 2021
64f995e
Add: Get starting height from config file, refetch only missing block…
MonikaCat Oct 25, 2021
5859c9c
Cleanup
MonikaCat Oct 25, 2021
33bd098
Merge branch 'v2/cosmos/stargate' of https://github.com/forbole/bdjun…
MonikaCat Oct 27, 2021
f0a1c2b
Add: UpdateTxInDatabase
MonikaCat Nov 8, 2021
a23ef8c
Update refreshTxs method
MonikaCat Nov 9, 2021
585e906
Rm handleBlock
MonikaCat Nov 10, 2021
a806ce1
Added distrModule
MonikaCat Nov 10, 2021
0892c56
Updated tx db saving values
MonikaCat Nov 12, 2021
5f6724d
Updated db tx values
MonikaCat Nov 15, 2021
60e11bd
Add store msgs in message table
MonikaCat Nov 15, 2021
ff62300
Rm error check from event search
MonikaCat Nov 15, 2021
cd09c81
Update txDetails type
MonikaCat Nov 15, 2021
b28f7da
Merge branch 'v2/cosmos/stargate' of https://github.com/forbole/bdjun…
MonikaCat Nov 15, 2021
334d784
Merge branch 'm/fix-missing-blocks' of https://github.com/forbole/bdj…
MonikaCat Nov 15, 2021
3e6c4ef
Update register gov module
MonikaCat Nov 15, 2021
ee44ca3
Store accounts first in saveUpToDateBalances
MonikaCat Nov 17, 2021
ae3b556
Check if delegations exists otherwise return nil
MonikaCat Nov 17, 2021
09bb440
Update command
MonikaCat Nov 17, 2021
30bd7fd
Added RemoveDuplicateAccountBalance method
MonikaCat Nov 18, 2021
fbc2fcd
Updated ErrDelegationNotFound msg
MonikaCat Nov 18, 2021
1bada96
Update handleMsgCreateValidator
MonikaCat Nov 19, 2021
c6344cb
Move ErrNotFound to utils/errors.go
MonikaCat Nov 19, 2021
5fc45fc
Updated delegation error handling
MonikaCat Nov 19, 2021
371912e
Merge branch 'v2/cosmos/stargate' of https://github.com/forbole/bdjun…
MonikaCat Nov 19, 2021
93aa091
Add expected modules
MonikaCat Nov 19, 2021
2f2bcf7
Update registrar.go
MonikaCat Nov 19, 2021
753212f
Cleanup, add HandleMessages
MonikaCat Nov 19, 2021
a69de74
Update consensus module registrar
MonikaCat Nov 22, 2021
74bc35f
Removed unused source
MonikaCat Nov 22, 2021
8e3da73
Moved UpdateTxs
MonikaCat Nov 22, 2021
0457ebb
Removed unused db param
MonikaCat Nov 22, 2021
98547fb
Update refetching desc
MonikaCat Nov 22, 2021
479633b
Update cdm desc
MonikaCat Nov 22, 2021
0682fb6
Removed RemoveDuplicateAccountBalance
MonikaCat Nov 22, 2021
bee7e28
Cleanup
MonikaCat Nov 22, 2021
e95d007
Rm unused modules from expected_modules.go
MonikaCat Nov 22, 2021
ffc8b33
lint
MonikaCat Nov 22, 2021
2a7c18a
Cleanup
MonikaCat Nov 23, 2021
d46a684
Update block, tx and msg storing methods
MonikaCat Dec 1, 2021
543cdb3
Update: read startHeight from config file
MonikaCat Dec 1, 2021
d7192a6
Merge branch 'v2/cosmos/stargate' of https://github.com/forbole/bdjun…
MonikaCat Dec 1, 2021
08881ee
Update registered staking module
MonikaCat Dec 1, 2021
8788376
Rv RemoveDuplicateAccountBalance
MonikaCat Dec 6, 2021
6806a11
Rm error code check
MonikaCat Dec 6, 2021
6f01b71
Merge branch 'v2/cosmos/stargate' of https://github.com/forbole/bdjun…
MonikaCat Dec 6, 2021
8195ad4
Add worker to process missing blocks
MonikaCat Dec 13, 2021
7030101
Updated registrar
MonikaCat Dec 13, 2021
12db700
Merge branch 'v2/cosmos/stargate' of https://github.com/forbole/bdjun…
MonikaCat Dec 13, 2021
6228bc7
Revert err msg
MonikaCat Dec 15, 2021
378cf53
Merge branch 'v2/cosmos/stargate' into m/fix-missing-blocks
mergify[bot] Dec 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions cmd/fix/blocks/blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package blocks

import (
"encoding/hex"
"fmt"
"strings"

"github.com/forbole/bdjuno/v2/database"
"github.com/forbole/bdjuno/v2/modules"
"github.com/forbole/bdjuno/v2/modules/bank"
"github.com/forbole/bdjuno/v2/modules/consensus"
"github.com/forbole/bdjuno/v2/modules/distribution"
"github.com/forbole/bdjuno/v2/modules/gov"
"github.com/forbole/bdjuno/v2/modules/history"
"github.com/forbole/bdjuno/v2/modules/staking"
"github.com/forbole/bdjuno/v2/utils"
"github.com/forbole/juno/v2/cmd/parse"
junomessages "github.com/forbole/juno/v2/modules/messages"
"github.com/forbole/juno/v2/types/config"
"github.com/spf13/cobra"
tmctypes "github.com/tendermint/tendermint/rpc/core/types"
)

// blocksCmd returns a Cobra command that allows to fix missing blocks in database
func blocksCmd(parseConfig *parse.Config) *cobra.Command {
MonikaCat marked this conversation as resolved.
Show resolved Hide resolved
return &cobra.Command{
Use: "refetch",
Short: "Fix missing blocks and transactions in database from the start height",
RunE: func(cmd *cobra.Command, args []string) error {
parseCtx, err := parse.GetParsingContext(parseConfig)
if err != nil {
return err
}

sources, err := modules.BuildSources(config.Cfg.Node, parseCtx.EncodingConfig)
if err != nil {
return err
}

// Get the database
db := database.Cast(parseCtx.Database)

// Register modules
bankModule := bank.NewModule(junomessages.BankMessagesParser, sources.BankSource, parseCtx.EncodingConfig.Marshaler, db)
distrModule := distribution.NewModule(config.Cfg, sources.DistrSource, bankModule, parseCtx.EncodingConfig.Marshaler, db)
historyModule := history.NewModule(config.Cfg.Chain, junomessages.BankMessagesParser, parseCtx.EncodingConfig.Marshaler, db)
stakingModule := staking.NewModule(sources.StakingSource, bankModule, distrModule, historyModule, nil, parseCtx.EncodingConfig.Marshaler, db)
govModule := gov.NewModule(sources.GovSource, nil, bankModule, distrModule, nil, nil, stakingModule, parseCtx.EncodingConfig.Marshaler, db)

// Build the consensus module
consensusModule := consensus.NewModule(bankModule, distrModule, govModule, stakingModule, db)

// Get latest height
height, err := parseCtx.Node.LatestHeight()
if err != nil {
return fmt.Errorf("error while getting chain latest block height: %s", err)
}

k := config.Cfg.Parser.StartHeight
fmt.Printf("Refetching missing blocks and transactions from height %d ... \n", k)
for ; k <= height; k++ {
missingBlock := consensusModule.IsBlockMissing(k)
if missingBlock {
fmt.Printf("Refetching block %d ... \n", k)
err = refreshBlock(parseCtx, k, consensusModule)
if err != nil {
return err
}
}
}

return nil
},
}
}

func refreshBlock(parseCtx *parse.Context, blockHeight int64, consensusModule *consensus.Module) error {
// Get the block details
block, err := utils.QueryBlock(parseCtx.Node, blockHeight)
if err != nil {
return err
}

err = consensusModule.UpdateBlock(block)

if len(block.Block.Txs) != 0 {
for _, tx := range block.Block.Txs {
fmt.Printf("Refetching tx %v ... \n", strings.ToUpper(hex.EncodeToString(tx.Hash())))
err = refreshTxs(parseCtx, consensusModule, block)
if err != nil {
return fmt.Errorf("error when updatig tx %s", err)
}
}
}
if err != nil {
return fmt.Errorf("error while updating block %d: %s", blockHeight, err)
}

return nil
}

func refreshTxs(parseCtx *parse.Context, consensusModule *consensus.Module, block *tmctypes.ResultBlock) error {

for _, tx := range block.Block.Txs {
// Get the tx details
txDetails, err := parseCtx.Node.Tx(hex.EncodeToString(tx.Hash()))
if err != nil {
return fmt.Errorf("error while encoding tx to string %s", err)
}

err = consensusModule.HandleMessages(txDetails)
if err != nil {
return fmt.Errorf("error when updating tx message tx %s", err)
}
}

return nil
}
20 changes: 20 additions & 0 deletions cmd/fix/blocks/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package blocks

import (
"github.com/forbole/juno/v2/cmd/parse"
"github.com/spf13/cobra"
)

// NewBlocksCmd returns the Cobra command that allows to fix all the things related to blocks
func NewBlocksCmd(parseConfig *parse.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "blocks",
Short: "Fix things related to blocks and transactions",
}

cmd.AddCommand(
blocksCmd(parseConfig),
)

return cmd
}
2 changes: 2 additions & 0 deletions cmd/fix/fix.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/spf13/cobra"

fixauth "github.com/forbole/bdjuno/v2/cmd/fix/auth"
fixblocks "github.com/forbole/bdjuno/v2/cmd/fix/blocks"
fixgov "github.com/forbole/bdjuno/v2/cmd/fix/gov"
fixstaking "github.com/forbole/bdjuno/v2/cmd/fix/staking"
)
Expand All @@ -19,6 +20,7 @@ func NewFixCmd(parseCfg *parse.Config) *cobra.Command {

cmd.AddCommand(
fixauth.NewAuthCmd(parseCfg),
fixblocks.NewBlocksCmd(parseCfg),
fixgov.NewGovCmd(parseCfg),
fixstaking.NewStakingCmd(parseCfg),
)
Expand Down
14 changes: 13 additions & 1 deletion database/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

dbtypes "github.com/forbole/bdjuno/v2/database/types"
"github.com/forbole/bdjuno/v2/utils"

dbutils "github.com/forbole/bdjuno/v2/database/utils"

Expand Down Expand Up @@ -40,23 +41,34 @@ func (db *Db) SaveAccountBalances(balances []types.AccountBalance) error {
func (db *Db) saveUpToDateBalances(paramsNumber int, balances []types.AccountBalance) error {
stmt := `INSERT INTO account_balance (address, coins, height) VALUES `
var params []interface{}
var accounts []types.Account
// remove duplicate values of accounts balance
balances = utils.RemoveDuplicateAccountBalance(balances)

for i, bal := range balances {

bi := i * paramsNumber
stmt += fmt.Sprintf("($%d, $%d, $%d),", bi+1, bi+2, bi+3)
accounts = append(accounts, types.NewAccount(bal.Address))

coins := pq.Array(dbtypes.NewDbCoins(bal.Balance))
params = append(params, bal.Address, coins, bal.Height)
}

// Store the accounts
err := db.SaveAccounts(accounts)
if err != nil {
return fmt.Errorf("error while storing account balance : %s", err)
}

stmt = stmt[:len(stmt)-1]
stmt += `
ON CONFLICT (address) DO UPDATE
SET coins = excluded.coins,
height = excluded.height
WHERE account_balance.height <= excluded.height`

_, err := db.Sql.Exec(stmt, params...)
_, err = db.Sql.Exec(stmt, params...)
if err != nil {
return fmt.Errorf("error while storing up-to-date balances: %s", err)
}
Expand Down
88 changes: 88 additions & 0 deletions database/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import (
"fmt"
"time"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/forbole/bdjuno/v2/types"
"github.com/lib/pq"

dbtypes "github.com/forbole/bdjuno/v2/database/types"
"github.com/forbole/bdjuno/v2/utils"
junotypes "github.com/forbole/juno/v2/types"
tmctypes "github.com/tendermint/tendermint/rpc/core/types"
)

// GetLastBlock returns the last block stored inside the database based on the heights
Expand Down Expand Up @@ -184,3 +189,86 @@ func (db *Db) GetGenesis() (*types.Genesis, error) {
row := rows[0]
return types.NewGenesis(row.ChainID, row.Time, row.InitialHeight), nil
}

// CheckIfBlockIsMissing checks if block is already stored in database
func (db *Db) CheckIfBlockIsMissing(height int64) bool {
var block []dbtypes.BlockRow
stmt := `SELECT * FROM block WHERE height = $1`

err := db.Sqlx.Select(&block, stmt, height)
if err != nil {
return true
}
if len(block) != 0 {
return false
}

return true
}

// UpdateBlocksInDatabase updates given block in database
func (db *Db) UpdateBlockInDatabase(block *tmctypes.ResultBlock) error {
junoBlock := junotypes.NewBlockFromTmBlock(block, uint64(block.Block.Height))
err := db.SaveBlock(junoBlock)
if err != nil {
return fmt.Errorf("error while storing block %d, error: %s", block.Block.Height, err)
}
return nil
}

// UpdateTxInDatabase updates transactions for a given block in database
func (db *Db) UpdateTxInDatabase(i int, tx *junotypes.Tx) error {
err := db.SaveTx(tx)
if err != nil {
return fmt.Errorf("error while storing tx in database: %s", err)
}
err = db.UpdateMsgsInDatabase(tx, i)
if err != nil {
return fmt.Errorf("error while updating tx message in database: %s", err)
}

return nil
}

// UpdateMsgsInDatabase updates messages for a given block in database
func (db *Db) UpdateMsgsInDatabase(tx *junotypes.Tx, i int) error {
var eventTypes []string
var involvedAccounts []string
var attributeKeys []string
message, err := codec.ProtoMarshalJSON(tx.Body.Messages[i], nil)
if err != nil {
return fmt.Errorf("error while marshaling tx message: %s", err)
}

for _, event := range tx.Logs {
for _, eventType := range event.Events {
eventTypess := eventType.Type
eventTypes = append(eventTypes, eventTypess)
attributes := eventType.Attributes
for _, attribute := range attributes {
attributeKeys = append(attributeKeys, attribute.Key)
}
}
}
attributeKeys = utils.RemoveDuplicateValues(attributeKeys)

for _, eventType := range eventTypes {
event, _ := tx.FindEventByType(i, eventType)
for _, key := range attributeKeys {
address, _ := tx.FindAttributeByKey(event, key)
// process only addresses
if len(address) >= 40 {
involvedAccounts = append(involvedAccounts, address)
}
}
involvedAccounts = utils.RemoveDuplicateValues(involvedAccounts)
}

junoMsg := junotypes.NewMessage(tx.TxHash, i, tx.Body.Messages[i].TypeUrl, string(message), pq.StringArray(involvedAccounts))
err = db.SaveMessage(junoMsg)
if err != nil {
return fmt.Errorf("error while storing tx message in database, error: %s", err)
}

return nil
}
2 changes: 1 addition & 1 deletion database/staking_delegations.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ INSERT INTO delegation (validator_address, delegator_address, amount, height) VA
// Get the validator consensus address
consAddr, err := db.GetValidatorConsensusAddress(delegation.ValidatorOperAddr)
if err != nil {
return fmt.Errorf("error while gettting validator consensus address: %s", err)
return err
MonikaCat marked this conversation as resolved.
Show resolved Hide resolved
}

// Convert the amount
Expand Down
22 changes: 22 additions & 0 deletions modules/consensus/expected_modules.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package consensus

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/forbole/juno/v2/types"
)

type BankModule interface {
HandleMsg(_ int, msg sdk.Msg, tx *types.Tx) error
}

type DistrModule interface {
HandleMsg(_ int, msg sdk.Msg, tx *types.Tx) error
}

type GovModule interface {
HandleMsg(_ int, msg sdk.Msg, tx *types.Tx) error
}

type StakingModule interface {
HandleMsg(_ int, msg sdk.Msg, tx *types.Tx) error
}
18 changes: 15 additions & 3 deletions modules/consensus/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,25 @@ var (

// Module implements the consensus utils
type Module struct {
db *database.Db
db *database.Db
bankModule BankModule
distrModule DistrModule
govModule GovModule
stakingModule StakingModule
}

// NewModule builds a new Module instance
func NewModule(db *database.Db) *Module {
func NewModule(bankModule BankModule,
distrModule DistrModule,
govModule GovModule,
stakingModule StakingModule,
db *database.Db) *Module {
return &Module{
db: db,
bankModule: bankModule,
distrModule: distrModule,
govModule: govModule,
stakingModule: stakingModule,
db: db,
}
}

Expand Down
Loading