Skip to content

Commit

Permalink
Merge pull request #3434 from TrueBlocks/develop
Browse files Browse the repository at this point in the history
Update to Version 2.5.0 - Much better concurrent code
  • Loading branch information
tjayrush authored Dec 2, 2023
2 parents 4e530b4 + 30e21a3 commit be29ddb
Show file tree
Hide file tree
Showing 95 changed files with 5,023 additions and 1,615 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ info:
license:
name: GPL 3.0
url: http://www.gnu.org/licenses/
version: 2.2.0-release
version: 2.5.0-release
description: >
A REST layer over the TrueBlocks application. With `chifra daemon`, you can
Expand Down
12 changes: 11 additions & 1 deletion src/apps/chifra/internal/blocks/handle_decache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (opts *BlocksOptions) HandleDecache() error {

ctx := context.Background()
fetchData := func(modelChan chan types.Modeler[types.RawModeler], errorChan chan error) {
if msg, err := decache.Decache(opts.Conn, itemsToRemove, silent, walk.Cache_Blocks); err != nil {
if msg, err := decache.Decache(opts.Conn, itemsToRemove, silent, opts.getCacheType()); err != nil {
errorChan <- err
} else {
s := types.SimpleMessage{
Expand All @@ -36,3 +36,13 @@ func (opts *BlocksOptions) HandleDecache() error {
opts.Globals.NoHeader = true
return output.StreamMany(ctx, fetchData, opts.Globals.OutputOpts())
}

func (opts *BlocksOptions) getCacheType() walk.CacheType {
cT := walk.Cache_Blocks
if opts.Logs {
cT = walk.Cache_Logs
} else if opts.Traces {
cT = walk.Cache_Traces
}
return cT
}
103 changes: 53 additions & 50 deletions src/apps/chifra/internal/blocks/handle_hashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ package blocksPkg

import (
"context"
"errors"
"fmt"
"sort"

"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/identifiers"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/output"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/types"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/utils"
"github.com/ethereum/go-ethereum"
)

func (opts *BlocksOptions) HandleHashes() error {
Expand All @@ -24,62 +23,66 @@ func (opts *BlocksOptions) HandleHashes() error {

ctx, cancel := context.WithCancel(context.Background())
fetchData := func(modelChan chan types.Modeler[types.RawBlock], errorChan chan error) {
// var cnt int
var err error
var appMap map[types.SimpleAppearance]*types.SimpleBlock[string]
if appMap, _, err = identifiers.AsMap[types.SimpleBlock[string]](chain, opts.BlockIds); err != nil {
if sliceOfMaps, cnt, err := identifiers.AsSliceOfMaps[types.SimpleBlock[string]](chain, opts.BlockIds); err != nil {
errorChan <- err
cancel()
}

bar := logger.NewBar(logger.BarOptions{
Type: logger.Expanding,
Enabled: !opts.Globals.TestMode,
Total: int64(len(appMap)),
})
} else if cnt == 0 {
errorChan <- fmt.Errorf("no blocks found for the query")
cancel()

} else {
bar := logger.NewBar(logger.BarOptions{
Enabled: !testMode && !utils.IsTerminal(),
Total: int64(cnt),
})

iterFunc := func(app types.SimpleAppearance, value *types.SimpleBlock[string]) error {
bn := uint64(app.BlockNumber)
if block, err := opts.Conn.GetBlockHeaderByNumber(bn); err != nil {
errorChan <- err
if errors.Is(err, ethereum.NotFound) {
errorChan <- errors.New("uncles not found")
for _, thisMap := range sliceOfMaps {
thisMap := thisMap
for app := range thisMap {
thisMap[app] = new(types.SimpleBlock[string])
}
cancel()
return nil
} else {
bar.Tick()
*value = block
}
return nil
}

iterErrorChan := make(chan error)
iterCtx, iterCancel := context.WithCancel(context.Background())
defer iterCancel()
go utils.IterateOverMap(iterCtx, iterErrorChan, appMap, iterFunc)
for err := range iterErrorChan {
if !testMode || nErrors == 0 {
errorChan <- err
nErrors++
}
}
bar.Finish(true)
items := make([]*types.SimpleBlock[string], 0, len(thisMap))
iterFunc := func(app types.SimpleAppearance, value *types.SimpleBlock[string]) error {
bn := uint64(app.BlockNumber)
if block, err := opts.Conn.GetBlockHeaderByNumber(bn); err != nil {
delete(thisMap, app)
return err
} else {
*value = block
bar.Tick()
}
return nil
}

items := make([]*types.SimpleBlock[string], 0, len(appMap))
for _, item := range appMap {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
if items[i].BlockNumber == items[j].BlockNumber {
return items[i].Hash.Hex() < items[j].Hash.Hex()
}
return items[i].BlockNumber < items[j].BlockNumber
})
iterErrorChan := make(chan error)
iterCtx, iterCancel := context.WithCancel(context.Background())
defer iterCancel()
go utils.IterateOverMap(iterCtx, iterErrorChan, thisMap, iterFunc)
for err := range iterErrorChan {
if !testMode || nErrors == 0 {
errorChan <- err
nErrors++
}
}

for _, item := range items {
item := item
modelChan <- item
for _, item := range thisMap {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
if items[i].BlockNumber == items[j].BlockNumber {
return items[i].Hash.Hex() < items[j].Hash.Hex()
}
return items[i].BlockNumber < items[j].BlockNumber
})

for _, item := range items {
item := item
modelChan <- item
}
}
bar.Finish(true /* newLine */)
}
}

Expand Down
139 changes: 73 additions & 66 deletions src/apps/chifra/internal/blocks/handle_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,85 +39,92 @@ func (opts *BlocksOptions) HandleLogs() error {

ctx, cancel := context.WithCancel(context.Background())
fetchData := func(modelChan chan types.Modeler[types.RawLog], errorChan chan error) {
// var cnt int
var err error
var appMap map[types.SimpleAppearance]*types.SimpleTransaction
if appMap, _, err = identifiers.AsMap[types.SimpleTransaction](chain, opts.BlockIds); err != nil {
if sliceOfMaps, cnt, err := identifiers.AsSliceOfMaps[types.SimpleTransaction](chain, opts.BlockIds); err != nil {
errorChan <- err
cancel()
}

bar := logger.NewBar(logger.BarOptions{
Enabled: !opts.Globals.TestMode,
Total: int64(len(appMap)),
})
} else if cnt == 0 {
errorChan <- fmt.Errorf("no blocks found for the query")
cancel()

iterFunc := func(app types.SimpleAppearance, value *types.SimpleTransaction) error {
if value.Receipt == nil {
value.Receipt = &types.SimpleReceipt{}
}
} else {
bar := logger.NewBar(logger.BarOptions{
Enabled: !testMode && !utils.IsTerminal(),
Total: int64(cnt),
})

for _, thisMap := range sliceOfMaps {
thisMap := thisMap
for app := range thisMap {
thisMap[app] = new(types.SimpleTransaction)
}

bn := uint64(app.BlockNumber)
ts := opts.Conn.GetBlockTimestamp(bn)
if logs, err := opts.Conn.GetLogsByNumber(bn, ts); err != nil {
errorChan <- fmt.Errorf("block at %d returned an error: %w", bn, err)
return nil

} else if len(logs) == 0 {
errorChan <- fmt.Errorf("block at %d has no logs", bn)
return nil

} else {
l := make([]types.SimpleLog, 0, len(logs))
for index := range logs {
if opts.Articulate {
if err = abiCache.ArticulateLog(&logs[index]); err != nil {
errorChan <- err // continue even with an error
iterFunc := func(app types.SimpleAppearance, value *types.SimpleTransaction) error {
if value.Receipt == nil {
value.Receipt = &types.SimpleReceipt{}
}

bn := uint64(app.BlockNumber)
ts := opts.Conn.GetBlockTimestamp(bn)
if logs, err := opts.Conn.GetLogsByNumber(bn, ts); err != nil {
delete(thisMap, app)
return fmt.Errorf("block at %d returned an error: %w", bn, err)

} else if len(logs) == 0 {
delete(thisMap, app)
return fmt.Errorf("block at %d has no logs", bn)

} else {
l := make([]types.SimpleLog, 0, len(logs))
for index := range logs {
if opts.Articulate {
if err = abiCache.ArticulateLog(&logs[index]); err != nil {
errorChan <- err // continue even with an error
}
}
l = append(l, logs[index])
}
value.Receipt.Logs = append(value.Receipt.Logs, l...)
bar.Tick()
return nil
}
l = append(l, logs[index])
}
bar.Tick()
value.Receipt.Logs = append(value.Receipt.Logs, l...)
}
return nil
}

iterErrorChan := make(chan error)
iterCtx, iterCancel := context.WithCancel(context.Background())
defer iterCancel()
go utils.IterateOverMap(iterCtx, iterErrorChan, appMap, iterFunc)
for err := range iterErrorChan {
if !testMode || nErrors == 0 {
errorChan <- err
// Reporting more than one error causes tests to fail because they
// appear concurrently so sort differently
nErrors++
}
}
bar.Finish(true)
iterErrorChan := make(chan error)
iterCtx, iterCancel := context.WithCancel(context.Background())
defer iterCancel()
go utils.IterateOverMap(iterCtx, iterErrorChan, thisMap, iterFunc)
for err := range iterErrorChan {
if !testMode || nErrors == 0 {
errorChan <- err
nErrors++
}
}

items := make([]types.SimpleLog, 0, len(appMap))
for _, tx := range appMap {
tx := tx
items = append(items, tx.Receipt.Logs...)
}
sort.Slice(items, func(i, j int) bool {
if items[i].BlockNumber == items[j].BlockNumber {
if items[i].TransactionIndex == items[j].TransactionIndex {
return items[i].LogIndex < items[j].LogIndex
items := make([]types.SimpleLog, 0, len(thisMap))
for _, tx := range thisMap {
tx := tx
items = append(items, tx.Receipt.Logs...)
}
return items[i].TransactionIndex < items[j].TransactionIndex
}
return items[i].BlockNumber < items[j].BlockNumber
})
sort.Slice(items, func(i, j int) bool {
if items[i].BlockNumber == items[j].BlockNumber {
if items[i].TransactionIndex == items[j].TransactionIndex {
return items[i].LogIndex < items[j].LogIndex
}
return items[i].TransactionIndex < items[j].TransactionIndex
}
return items[i].BlockNumber < items[j].BlockNumber
})

for _, item := range items {
item := item
if !logFilter.PassesFilter(&item) {
continue
for _, item := range items {
item := item
if !logFilter.PassesFilter(&item) {
continue
}
modelChan <- &item
}
}
modelChan <- &item
bar.Finish(true /* newLine */)
}
}

Expand Down
Loading

0 comments on commit be29ddb

Please sign in to comment.