Skip to content

Commit

Permalink
Merge pull request #69 from fschoell/feature/flush_configurations
Browse files Browse the repository at this point in the history
added new flags to configure flush intervals
  • Loading branch information
maoueh authored Jan 6, 2025
2 parents 0d571f9 + 3459d75 commit c0a7a10
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## v4.4.0

* Added support for the Clickhouse `Date` type.
* Added more flags to configure flushing intervals. Available flags are `batch-block-flush-interval`, `batch-row-flush-interval` and `live-block-flush-interval`.
* Deprecated the existing `flush-interval` flag in favor of `batch-block-flush-interval`.

* Fixed handling of the Clickhouse `Array` type.

Expand Down
6 changes: 4 additions & 2 deletions cmd/substreams-sink-sql/common_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ func extractSinkConfig(pkg *pbsubstreams.Package) (*pbsql.Service, error) {
func newDBLoader(
cmd *cobra.Command,
psqlDSN string,
flushInterval time.Duration,
batchBlockFlushInterval int,
batchRowFlushInterval int,
liveBlockFlushInterval int,
handleReorgs bool,
) (*db.Loader, error) {
moduleMismatchMode, err := db.ParseOnModuleHashMismatch(sflags.MustGetString(cmd, onModuleHashMistmatchFlag))
cli.NoError(err, "invalid mistmatch mode")

dbLoader, err := db.NewLoader(psqlDSN, flushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer)
dbLoader, err := db.NewLoader(psqlDSN, batchBlockFlushInterval, batchRowFlushInterval, liveBlockFlushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer)
if err != nil {
return nil, fmt.Errorf("new psql loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/create_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func createUserE(cmd *cobra.Command, args []string) error {
}

if err := retry(ctx, func(ctx context.Context) error {
dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
dbLoader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
if err != nil {
return fmt.Errorf("new psql loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/generate_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func generateCsvE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new base sinker: %w", err)
}

dbLoader, err := newDBLoader(cmd, dsn, 0, false) // flush interval not used in CSV mode
dbLoader, err := newDBLoader(cmd, dsn, 0, 0, 0, false) // flush interval not used in CSV mode
if err != nil {
return fmt.Errorf("new db loader: %w", err)
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/substreams-sink-sql/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ var sinkRunCmd = Command(sinkRunE,
AddCommonSinkerFlags(flags)

flags.Int("undo-buffer-size", 0, "If non-zero, handling of reorgs in the database is disabled. Instead, a buffer is introduced to only process a blocks once it has been confirmed by that many blocks, introducing a latency but slightly reducing the load on the database when close to head.")
flags.Int("flush-interval", 1000, "When in catch up mode, flush every N blocks")
flags.Int("batch-block-flush-interval", 1_000, "When in catch up mode, flush every N blocks or after batch-row-flush-interval, whichever comes first. Set to 0 to disable and only use batch-row-flush-interval. Ineffective if the sink is now in the live portion of the chain where only 'live-block-flush-interval' applies.")
flags.Int("batch-row-flush-interval", 100_000, "When in catch up mode, flush every N rows or after batch-block-flush-interval, whichever comes first. Set to 0 to disable and only use batch-block-flush-interval. Ineffective if the sink is now in the live portion of the chain where only 'live-block-flush-interval' applies.")
flags.Int("live-block-flush-interval", 1, "When processing in live mode, flush every N blocks.")
flags.Int("flush-interval", 0, "(deprecated) please use --batch-block-flush-interval instead")
flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`")
}),
Example("substreams-sink-sql run 'postgres://localhost:5432/posgres?sslmode=disable' [email protected]"),
Expand Down Expand Up @@ -85,7 +88,12 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new base sinker: %w", err)
}

dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetDuration(cmd, "flush-interval"), handleReorgs)
batchBlockFlushInterval := sflags.MustGetInt(cmd, "batch-block-flush-interval")
if sflags.MustGetInt(cmd, "flush-interval") != 0 {
batchBlockFlushInterval = sflags.MustGetInt(cmd, "flush-interval")
}

dbLoader, err := newDBLoader(cmd, dsn, batchBlockFlushInterval, sflags.MustGetInt(cmd, "batch-row-flush-interval"), sflags.MustGetInt(cmd, "live-block-flush-interval"), handleReorgs)
if err != nil {
return fmt.Errorf("new db loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func sinkSetupE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("extract sink config: %w", err)
}

dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
dbLoader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
if err != nil {
return fmt.Errorf("new psql loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func toolsDeleteCursorE(cmd *cobra.Command, args []string) error {

func toolsCreateLoader() *db.Loader {
dsn := viper.GetString("tools-global-dsn")
loader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer)
loader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer)
cli.NoError(err, "Unable to instantiate database manager from DSN %q", dsn)

if err := loader.LoadTables(); err != nil {
Expand Down
55 changes: 37 additions & 18 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"time"

"github.com/jimsmart/schema"
"github.com/streamingfast/logging"
orderedmap "github.com/wk8/go-ordered-map/v2"
Expand Down Expand Up @@ -39,9 +37,11 @@ type Loader struct {
tables map[string]*TableInfo
cursorTable *TableInfo

handleReorgs bool
flushInterval time.Duration
moduleMismatchMode OnModuleHashMismatch
handleReorgs bool
batchBlockFlushInterval int
batchRowFlushInterval int
liveBlockFlushInterval int
moduleMismatchMode OnModuleHashMismatch

logger *zap.Logger
tracer logging.Tracer
Expand All @@ -51,7 +51,9 @@ type Loader struct {

func NewLoader(
psqlDsn string,
flushInterval time.Duration,
batchBlockFlushInterval int,
batchRowFlushInterval int,
liveBlockFlushInterval int,
moduleMismatchMode OnModuleHashMismatch,
handleReorgs *bool,
logger *zap.Logger,
Expand All @@ -68,15 +70,17 @@ func NewLoader(
}

l := &Loader{
DB: db,
database: dsn.database,
schema: dsn.schema,
entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](),
tables: map[string]*TableInfo{},
flushInterval: flushInterval,
moduleMismatchMode: moduleMismatchMode,
logger: logger,
tracer: tracer,
DB: db,
database: dsn.database,
schema: dsn.schema,
entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](),
tables: map[string]*TableInfo{},
batchBlockFlushInterval: batchBlockFlushInterval,
batchRowFlushInterval: batchRowFlushInterval,
liveBlockFlushInterval: liveBlockFlushInterval,
moduleMismatchMode: moduleMismatchMode,
logger: logger,
tracer: tracer,
}
_, err = l.tryDialect()
if err != nil {
Expand All @@ -95,7 +99,9 @@ func NewLoader(
}

logger.Info("created new DB loader",
zap.Duration("flush_interval", flushInterval),
zap.Int("batch_block_flush_interval", batchBlockFlushInterval),
zap.Int("batch_row_flush_interval", batchRowFlushInterval),
zap.Int("live_block_flush_interval", liveBlockFlushInterval),
zap.String("driver", dsn.driver),
zap.String("database", dsn.database),
zap.String("schema", dsn.schema),
Expand Down Expand Up @@ -129,8 +135,21 @@ func (l *Loader) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error) {
return l.DB.BeginTx(ctx, opts)
}

func (l *Loader) FlushInterval() time.Duration {
return l.flushInterval
func (l *Loader) BatchBlockFlushInterval() int {
return l.batchBlockFlushInterval
}

func (l *Loader) LiveBlockFlushInterval() int {
return l.liveBlockFlushInterval
}

func (l *Loader) FlushNeeded() bool {
totalRows := 0
// todo keep a running count when inserting/deleting rows directly
for pair := l.entries.Oldest(); pair != nil; pair = pair.Next() {
totalRows += pair.Value.Len()
}
return totalRows > l.batchRowFlushInterval
}

func (l *Loader) LoadTables() error {
Expand Down
2 changes: 1 addition & 1 deletion db/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func NewTestLoader(
tables map[string]*TableInfo,
) (*Loader, *TestTx) {

loader, err := NewLoader("psql://x:5432/x", 0, OnModuleHashMismatchIgnore, nil, zlog, tracer)
loader, err := NewLoader("psql://x:5432/x", 0, 0, 0, OnModuleHashMismatchIgnore, nil, zlog, tracer)
if err != nil {
panic(err)
}
Expand Down
22 changes: 12 additions & 10 deletions sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ import (
"google.golang.org/protobuf/proto"
)

const (
HISTORICAL_BLOCK_FLUSH_EACH = 1000
LIVE_BLOCK_FLUSH_EACH = 1
)
const BLOCK_FLUSH_INTERVAL_DISABLED = 0

type SQLSinker struct {
*shutter.Shutter
Expand Down Expand Up @@ -121,8 +118,13 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
}
}

if data.Clock.Number%s.batchBlockModulo(isLive) == 0 {
s.logger.Debug("flushing to database", zap.Stringer("block", cursor.Block()), zap.Bool("is_live", *isLive))
if (s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0) || s.loader.FlushNeeded() {
s.logger.Debug("flushing to database",
zap.Stringer("block", cursor.Block()),
zap.Bool("is_live", *isLive),
zap.Bool("block_flush_interval_reached", s.batchBlockModulo(isLive) > 0 && data.Clock.Number%s.batchBlockModulo(isLive) == 0),
zap.Bool("row_flush_interval_reached", s.loader.FlushNeeded()),
)

flushStart := time.Now()
rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, data.FinalBlockHeight)
Expand Down Expand Up @@ -219,12 +221,12 @@ func (s *SQLSinker) batchBlockModulo(isLive *bool) uint64 {
}

if *isLive {
return LIVE_BLOCK_FLUSH_EACH
return uint64(s.loader.LiveBlockFlushInterval())
}

if s.loader.FlushInterval() > 0 {
return uint64(s.loader.FlushInterval())
if s.loader.BatchBlockFlushInterval() > 0 {
return uint64(s.loader.BatchBlockFlushInterval())
}

return HISTORICAL_BLOCK_FLUSH_EACH
return BLOCK_FLUSH_INTERVAL_DISABLED
}

0 comments on commit c0a7a10

Please sign in to comment.