diff --git a/CHANGELOG.md b/CHANGELOG.md index 81c216cfe6cc..a1e787f90e35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,12 +49,20 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### API Breaking Changes +* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) Update State Streaming APIs: + * Add method `ListenCommit` to `ABCIListener` + * Move `ListeningEnabled` and `AddListener` methods to `CommitMultiStore` + * Remove `CacheWrapWithListeners` from `CacheWrap` and `CacheWrapper` interfaces + * Remove listening APIs from the caching layer (it should only listen to the `rootmulti.Store`) + * Add three new options to file streaming service constructor. + * Modify `ABCIListener` such that any error from any method will always halt the app via `panic` * (store) [#13529](https://github.com/cosmos/cosmos-sdk/pull/13529) Add method `LatestVersion` to `MultiStore` interface, add method `SetQueryMultiStore` to baesapp to support alternative `MultiStore` implementation for query service. ### Bug Fixes * (baseapp) [#13983](https://github.com/cosmos/cosmos-sdk/pull/13983) Don't emit duplicate ante-handler events when a post-handler is defined. * (baseapp) [#14049](https://github.com/cosmos/cosmos-sdk/pull/14049) Fix state sync when interval is zero. +* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) Fix state listener that was observing writes at wrong time. ## [v0.46.6](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.46.6) - 2022-11-18 diff --git a/baseapp/abci.go b/baseapp/abci.go index 231eb7dcf6d9..da4bab3fb7cd 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -230,8 +230,7 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg // call the hooks with the BeginBlock messages for _, streamingListener := range app.abciListeners { - goCtx := sdk.WrapSDKContext(app.deliverState.ctx) - if err := streamingListener.ListenBeginBlock(goCtx, req, res); err != nil { + if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil { panic(fmt.Errorf("BeginBlock listening hook failed, height: %d, err: %w", req.Header.Height, err)) } } @@ -260,8 +259,7 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc // call the streaming service hooks with the EndBlock messages for _, streamingListener := range app.abciListeners { - goCtx := sdk.WrapSDKContext(app.deliverState.ctx) - if err := streamingListener.ListenEndBlock(goCtx, req, res); err != nil { + if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil { panic(fmt.Errorf("EndBlock listening hook failed, height: %d, err: %w", req.Height, err)) } } @@ -317,7 +315,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv defer func() { for _, streamingListener := range app.abciListeners { if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil { - app.logger.Error("DeliverTx listening hook failed", "err", err) + panic(fmt.Errorf("DeliverTx listening hook failed: %w", err)) } } }() @@ -354,8 +352,6 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv // against that height and gracefully halt if it matches the latest committed // height. func (app *BaseApp) Commit() abci.ResponseCommit { - defer telemetry.MeasureSince(time.Now(), "abci", "commit") - header := app.deliverState.ctx.BlockHeader() retainHeight := app.GetBlockRetentionHeight(header.Height) @@ -372,8 +368,7 @@ func (app *BaseApp) Commit() abci.ResponseCommit { // call the hooks with the Commit message for _, streamingListener := range app.abciListeners { - goCtx := sdk.WrapSDKContext(app.deliverState.ctx) - if err := streamingListener.ListenCommit(goCtx, res); err != nil { + if err := streamingListener.ListenCommit(app.deliverState.ctx, res); err != nil { panic(fmt.Errorf("Commit listening hook failed, height: %d, err: %w", header.Height, err)) } } @@ -407,10 +402,7 @@ func (app *BaseApp) Commit() abci.ResponseCommit { app.halt() } - var snapshotHeight int64 - if app.snapshotInterval > 0 && uint64(header.Height)%app.snapshotInterval == 0 { - snapshotHeight = header.Height - } + go app.snapshotManager.SnapshotIfApplicable(header.Height) return res } diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index bcd7945cd07b..ed245c41d884 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -47,15 +47,8 @@ func NewMemoryListener() *MemoryListener { return &MemoryListener{} } -// OnWrite writes state change events to the internal cache -func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) { - fl.stateCache = append(fl.stateCache, StoreKVPair{ - StoreKey: storeKey.Name(), - Delete: delete, - Key: key, - Value: value, - }) -} +We will create two concrete implementations of the `WriteListener` interface in `store/types/listening.go`, that writes out protobuf +encoded KV pairs to an underlying `io.Writer`, and simply accumulate them in memory. We will create two concrete implementations of the `WriteListener` interface in `store/types/listening.go`, that writes out protobuf encoded KV pairs to an underlying `io.Writer`, and simply accumulate them in memory. @@ -301,14 +294,8 @@ func (app *BaseApp) SetStreamingService(s StreamingService) { } ``` -Implementing the service above: - -```go -// streaming/plugins/abci/{plugin_version}/grpc.go - -var ( - _ baseapp.ABCIListener = (*GRPCClient)(nil) -) +We will also modify the `BeginBlock`, `EndBlock`, and `DeliverTx` methods to pass ABCI requests and responses to any streaming service hooks registered +with the `BaseApp`. defer func() { // call the hooks with the BeginBlock messages @@ -325,11 +312,14 @@ func (m *GRPCClient) ListenFinalizeBlock(goCtx context.Context, req abci.Request return err } -func (m *GRPCClient) ListenCommit(goCtx context.Context, res abci.ResponseCommit, changeSet []store.StoreKVPair) error { - ctx := sdk.UnwrapSDKContext(goCtx) - _, err := m.client.ListenCommit(ctx, &ListenCommitRequest{BlockHeight: ctx.BlockHeight(), Res: res, ChangeSet: changeSet}) - return err -} + defer func() { + // call the hooks with the BeginBlock messages + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil { + panic(sdkerrors.Wrapf(err, "BeginBlock listening hook failed, height: %d", req.Header.Height)) + } + } + }() // GRPCServer is the gRPC server that GRPCClient talks to. type GRPCServer struct { @@ -362,12 +352,37 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv } }() -And the pre-compiled Go plugin `Impl`(*this is only used for plugins that are written in Go*): + defer func() { + // Call the streaming service hooks with the EndBlock messages + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil { + panic(sdkerrors.Wrapf(err, "EndBlock listening hook failed, height: %d", req.Height)) + } + } + }() return res } ``` +```go +func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) { + + defer func() { + // call the hooks with the DeliverTx messages + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil { + panic(sdkerrors.Wrap(err, "DeliverTx listening hook failed")) + } + } + }() + + ... + + app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID)) + ... +} + ```golang func (app *BaseApp) Commit() abci.ResponseCommit { header := app.deliverState.ctx.BlockHeader() @@ -394,6 +409,7 @@ func (app *BaseApp) Commit() abci.ResponseCommit { app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID)) ... } +``` #### Error Handling And Async Consumers @@ -477,33 +493,7 @@ func NewSimApp( baseAppOptions ...func(*baseapp.BaseApp), ) *SimApp { - ... - - keys := sdk.NewKVStoreKeys( - authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey, - minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey, - govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey, - evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey, - ) - - ... - - // register streaming services - streamingCfg := cast.ToStringMap(appOpts.Get(baseapp.StreamingTomlKey)) - for service := range streamingCfg { - pluginKey := fmt.Sprintf("%s.%s.%s", baseapp.StreamingTomlKey, service, baseapp.StreamingPluginTomlKey) - pluginName := strings.TrimSpace(cast.ToString(appOpts.Get(pluginKey))) - if len(pluginName) > 0 { - logLevel := cast.ToString(appOpts.Get(flags.FlagLogLevel)) - plugin, err := streaming.NewStreamingPlugin(pluginName, logLevel) - if err != nil { - tmos.Exit(err.Error()) - } - if err := baseapp.RegisterStreamingPlugin(bApp, appOpts, keys, plugin); err != nil { - tmos.Exit(err.Error()) - } - } - } + ... keys := sdk.NewKVStoreKeys( authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey, diff --git a/server/config/config.go b/server/config/config.go index 244b8acfb61d..ecbf61b26079 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -174,6 +174,15 @@ type ( Keys []string `mapstructure:"keys"` WriteDir string `mapstructure:"write_dir"` Prefix string `mapstructure:"prefix"` + // OutputMetadata specifies if output the block metadata file which includes + // the abci requests/responses, otherwise only the data file is outputted. + OutputMetadata bool `mapstructure:"output-metadata"` + // StopNodeOnError specifies if propagate the streamer errors to the consensus + // state machine, it's nesserary for data integrity of output. + StopNodeOnError bool `mapstructure:"stop-node-on-error"` + // Fsync specifies if calling fsync after writing the files, it slows down + // the commit, but don't lose data in face of system crash. + Fsync bool `mapstructure:"fsync"` } ) @@ -251,7 +260,13 @@ func DefaultConfig() *Config { }, Streamers: StreamersConfig{ File: FileStreamerConfig{ - Keys: []string{"*"}, + Keys: []string{"*"}, + WriteDir: "data/file_streamer", + OutputMetadata: true, + StopNodeOnError: true, + // NOTICE: the default config don't protect the streamer data integrity + // in face of system crash. + Fsync: false, }, }, } diff --git a/server/config/toml.go b/server/config/toml.go index 98ca20ccb5f5..327f64d151a5 100644 --- a/server/config/toml.go +++ b/server/config/toml.go @@ -230,6 +230,13 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}] keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}] write_dir = "{{ .Streamers.File.WriteDir }}" prefix = "{{ .Streamers.File.Prefix }}" +# output-metadata specifies if output the metadata file which includes the abci request/responses +# during processing the block. +output-metadata = "{{ .Streamers.File.OutputMetadata }}" +# stop-node-on-error specifies if propagate the file streamer errors to consensus state machine. +stop-node-on-error = "{{ .Streamers.File.StopNodeOnError }}" +# fsync specifies if call fsync after writing the files. +fsync = "{{ .Streamers.File.Fsync }}" ` var configTemplate *template.Template diff --git a/store/cachekv/store.go b/store/cachekv/store.go index 58412d5beb7e..9f5884eeddc3 100644 --- a/store/cachekv/store.go +++ b/store/cachekv/store.go @@ -9,7 +9,6 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/internal/conv" - "github.com/cosmos/cosmos-sdk/store/cachekv/internal" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/telemetry" diff --git a/store/cachemulti/store.go b/store/cachemulti/store.go index c65cdd8a39b3..a082588db268 100644 --- a/store/cachemulti/store.go +++ b/store/cachemulti/store.go @@ -41,9 +41,6 @@ func NewFromKVStore( store types.KVStore, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext, ) Store { - if listeners == nil { - listeners = make(map[types.StoreKey][]types.WriteListener) - } cms := Store{ db: cachekv.NewStore(store), stores: make(map[types.StoreKey]types.CacheWrap, len(stores)), @@ -81,8 +78,7 @@ func newCacheMultiStoreFromCMS(cms Store) Store { stores[k] = v } - // don't pass listeners to nested cache store. - return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext, nil) + return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext) } // SetTracer sets the tracer for the MultiStore that the underlying @@ -113,23 +109,6 @@ func (cms Store) TracingEnabled() bool { return cms.traceWriter != nil } -// AddListeners adds listeners for a specific KVStore -func (cms Store) AddListeners(key types.StoreKey, listeners []types.WriteListener) { - if ls, ok := cms.listeners[key]; ok { - cms.listeners[key] = append(ls, listeners...) - } else { - cms.listeners[key] = listeners - } -} - -// ListeningEnabled returns if listening is enabled for a specific KVStore -func (cms Store) ListeningEnabled(key types.StoreKey) bool { - if ls, ok := cms.listeners[key]; ok { - return len(ls) != 0 - } - return false -} - // LatestVersion returns the branch version of the store func (cms Store) LatestVersion() int64 { panic("cannot get latest version from branch cached multi-store") diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index ef711a0d5e29..783712d7ed59 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -243,7 +243,10 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error { // If it was deleted, remove all data if upgrades.IsDeleted(key.Name()) { - deleteKVStore(store.(types.KVStore)) + if err := deleteKVStore(types.KVStore(store)); err != nil { + return errors.Wrapf(err, "failed to delete store %s", key.Name()) + } + rs.removalMap[key] = true } else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" { // handle renames specially // make an unregistered key to satisfy loadCommitStore params @@ -257,7 +260,14 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error { } // move all data - moveKVStoreData(oldStore.(types.KVStore), store.(types.KVStore)) + if err := moveKVStoreData(types.KVStore(oldStore), types.KVStore(store)); err != nil { + return errors.Wrapf(err, "failed to move store %s -> %s", oldName, key.Name()) + } + + // add the old key so its deletion is committed + newStores[oldKey] = oldStore + // this will ensure it's not perpetually stored in commitInfo + rs.removalMap[oldKey] = true } } diff --git a/store/streaming/constructor.go b/store/streaming/constructor.go index 02b5f34f472c..e5616cd728a6 100644 --- a/store/streaming/constructor.go +++ b/store/streaming/constructor.go @@ -103,7 +103,7 @@ func NewFileStreamingService( fileDir = path.Join(homePath, fileDir) } - // try to create output directory if it does not exist + // try to create output directory if not exists. if _, err := os.Stat(fileDir); os.IsNotExist(err) { if err = os.MkdirAll(fileDir, os.ModePerm); err != nil { return nil, err diff --git a/store/streaming/file/service.go b/store/streaming/file/service.go index 63313b3b5d8e..1ac074fd07db 100644 --- a/store/streaming/file/service.go +++ b/store/streaming/file/service.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "path" "sort" @@ -30,40 +31,29 @@ type StreamingService struct { currentBlockNumber int64 blockMetadata types.BlockMetadata - - // outputMetadata, if true, writes additional metadata to file per block + // if write the metadata file, otherwise only data file is outputted. outputMetadata bool - - // stopNodeOnErr, if true, will panic and stop the node during ABCI Commit - // to ensure eventual consistency of the output, otherwise, any errors are - // logged and ignored which could yield data loss in streamed output. + // if true, when commit failed it will panic and stop the consensus state machine to ensure the + // eventual consistency of the output, otherwise the error is ignored and have the risk of lossing data. stopNodeOnErr bool - - // fsync, if true, will execute file Sync to make sure the data is persisted - // onto disk, otherwise there is a risk of data loss during any crash. + // if true, the file.Sync() is called to make sure the data is persisted onto disk, otherwise it risks lossing data when system crash. fsync bool } -func NewStreamingService( - writeDir, filePrefix string, - storeKeys []types.StoreKey, - cdc codec.BinaryCodec, - outputMetadata, stopNodeOnErr, fsync bool, -) (*StreamingService, error) { +// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys +func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec, outputMetadata bool, stopNodeOnErr bool, fsync bool) (*StreamingService, error) { // sort storeKeys for deterministic output sort.SliceStable(storeKeys, func(i, j int) bool { return storeKeys[i].Name() < storeKeys[j].Name() }) - // NOTE: We use the same listener for each store. listeners := make([]*types.MemoryListener, len(storeKeys)) + // in this case, we are using the same listener for each Store for i, key := range storeKeys { listeners[i] = types.NewMemoryListener(key) } - - // Check that the writeDir exists and is writable so that we can catch the - // error here at initialization. If it is not we don't open a dstFile until we - // receive our first ABCI message. + // check that the writeDir exists and is writable so that we can catch the error here at initialization if it is not + // we don't open a dstFile until we receive our first ABCI message if err := isDirWriteable(writeDir); err != nil { return nil, err } @@ -72,7 +62,7 @@ func NewStreamingService( storeListeners: listeners, filePrefix: filePrefix, writeDir: writeDir, - codec: cdc, + codec: c, outputMetadata: outputMetadata, stopNodeOnErr: stopNodeOnErr, fsync: fsync, @@ -87,65 +77,56 @@ func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListene for _, listener := range fss.storeListeners { listeners[listener.StoreKey()] = []types.WriteListener{listener} } - return listeners } -// ListenBeginBlock satisfies the ABCIListener interface. It sets the received -// BeginBlock request, response and the current block number. Note, these are -// not written to file until ListenCommit is executed and outputMetadata is set, -// after which it will be reset again on the next block. -func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { +// ListenBeginBlock satisfies the baseapp.ABCIListener interface +// It writes the received BeginBlock request and response and the resulting state changes +// out to a file as described in the above the naming schema +func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) (rerr error) { fss.blockMetadata.RequestBeginBlock = &req fss.blockMetadata.ResponseBeginBlock = &res fss.currentBlockNumber = req.Header.Height return nil } -// ListenDeliverTx satisfies the ABCIListener interface. It appends the received -// DeliverTx request and response to a list of DeliverTxs objects. Note, these -// are not written to file until ListenCommit is executed and outputMetadata is -// set, after which it will be reset again on the next block. -func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { +// ListenDeliverTx satisfies the baseapp.ABCIListener interface +// It writes the received DeliverTx request and response and the resulting state changes +// out to a file as described in the above the naming schema +func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) (rerr error) { fss.blockMetadata.DeliverTxs = append(fss.blockMetadata.DeliverTxs, &types.BlockMetadata_DeliverTx{ Request: &req, Response: &res, }) - return nil } -// ListenEndBlock satisfies the ABCIListener interface. It sets the received -// EndBlock request, response and the current block number. Note, these are -// not written to file until ListenCommit is executed and outputMetadata is set, -// after which it will be reset again on the next block. -func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { +// ListenEndBlock satisfies the baseapp.ABCIListener interface +// It writes the received EndBlock request and response and the resulting state changes +// out to a file as described in the above the naming schema +func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) (rerr error) { fss.blockMetadata.RequestEndBlock = &req fss.blockMetadata.ResponseEndBlock = &res return nil } -// ListenCommit satisfies the ABCIListener interface. It is executed during the -// ABCI Commit request and is responsible for writing all staged data to files. -// It will only return a non-nil error when stopNodeOnErr is set. +// ListenEndBlock satisfies the baseapp.ABCIListener interface func (fss *StreamingService) ListenCommit(ctx context.Context, res abci.ResponseCommit) error { - if err := fss.doListenCommit(ctx, res); err != nil { + err := fss.doListenCommit(ctx, res) + if err != nil { if fss.stopNodeOnErr { return err } } - return nil } func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.ResponseCommit) (err error) { fss.blockMetadata.ResponseCommit = &res - // Write to target files, the file size is written at the beginning, which can - // be used to detect completeness. + // write to target files, the file size is written at the beginning, which can be used to detect completeness. metaFileName := fmt.Sprintf("block-%d-meta", fss.currentBlockNumber) dataFileName := fmt.Sprintf("block-%d-data", fss.currentBlockNumber) - if fss.filePrefix != "" { metaFileName = fmt.Sprintf("%s-%s", fss.filePrefix, metaFileName) dataFileName = fmt.Sprintf("%s-%s", fss.filePrefix, dataFileName) @@ -156,7 +137,6 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon if err != nil { return err } - if err := writeLengthPrefixedFile(path.Join(fss.writeDir, metaFileName), bz, fss.fsync); err != nil { return err } @@ -166,26 +146,32 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon if err := fss.writeBlockData(&buf); err != nil { return err } - return writeLengthPrefixedFile(path.Join(fss.writeDir, dataFileName), buf.Bytes(), fss.fsync) } func (fss *StreamingService) writeBlockData(writer io.Writer) error { for _, listener := range fss.storeListeners { cache := listener.PopStateCache() - for i := range cache { bz, err := fss.codec.MarshalLengthPrefixed(&cache[i]) if err != nil { return err } - - if _, err := writer.Write(bz); err != nil { + if _, err = writer.Write(bz); err != nil { return err } } } + return nil +} + +// Stream satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { + return nil +} +// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Close() error { return nil } @@ -213,30 +199,25 @@ func writeLengthPrefixedFile(path string, data []byte, fsync bool) (err error) { if err != nil { return sdkerrors.Wrapf(err, "open file failed: %s", path) } - defer func() { // avoid overriding the real error with file close error if err1 := f.Close(); err1 != nil && err == nil { err = sdkerrors.Wrapf(err, "close file failed: %s", path) } }() - _, err = f.Write(sdk.Uint64ToBigEndian(uint64(len(data)))) if err != nil { return sdkerrors.Wrapf(err, "write length prefix failed: %s", path) } - _, err = f.Write(data) if err != nil { return sdkerrors.Wrapf(err, "write block data failed: %s", path) } - if fsync { err = f.Sync() if err != nil { return sdkerrors.Wrapf(err, "fsync failed: %s", path) } } - - return err + return } diff --git a/store/streaming/file/service_test.go b/store/streaming/file/service_test.go index c0f6b5968400..66b1d16d074d 100644 --- a/store/streaming/file/service_test.go +++ b/store/streaming/file/service_test.go @@ -119,17 +119,14 @@ func TestFileStreamingService(t *testing.T) { defer os.RemoveAll(testDir) testKeys := []types.StoreKey{mockStoreKey1, mockStoreKey2} - var err error testStreamingService, err = NewStreamingService(testDir, testPrefix, testKeys, testMarshaller, true, false, false) require.Nil(t, err) require.IsType(t, &StreamingService{}, testStreamingService) require.Equal(t, testPrefix, testStreamingService.filePrefix) require.Equal(t, testDir, testStreamingService.writeDir) require.Equal(t, testMarshaller, testStreamingService.codec) - testListener1 = testStreamingService.storeListeners[0] testListener2 = testStreamingService.storeListeners[1] - wg := new(sync.WaitGroup) testStreamingService.Stream(wg) @@ -139,10 +136,7 @@ func TestFileStreamingService(t *testing.T) { } func testListenBlock(t *testing.T) { - var ( - expectKVPairsStore1 [][]byte - expectKVPairsStore2 [][]byte - ) + var expectKVPairsStore1, expectKVPairsStore2 [][]byte // write state changes testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) @@ -173,12 +167,11 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) - expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair1, expectedKVPair3) expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair2) // send the ABCI messages - err = testStreamingService.ListenBeginBlock(emptyContextWrap, testBeginBlockReq, testBeginBlockRes) + err = testStreamingService.ListenBeginBlock(emptyContext, testBeginBlockReq, testBeginBlockRes) require.Nil(t, err) // write state changes @@ -194,7 +187,6 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) - expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey2, @@ -202,7 +194,6 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) - expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, @@ -210,12 +201,11 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) - expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair1) expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair2, expectedKVPair3) // send the ABCI messages - err = testStreamingService.ListenDeliverTx(emptyContextWrap, testDeliverTxReq1, testDeliverTxRes1) + err = testStreamingService.ListenDeliverTx(emptyContext, testDeliverTxReq1, testDeliverTxRes1) require.Nil(t, err) // write state changes @@ -231,7 +221,6 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) - expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey2, @@ -239,7 +228,6 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) - expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, @@ -247,12 +235,11 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) - expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair2) expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair1, expectedKVPair3) // send the ABCI messages - err = testStreamingService.ListenDeliverTx(emptyContextWrap, testDeliverTxReq2, testDeliverTxRes2) + err = testStreamingService.ListenDeliverTx(emptyContext, testDeliverTxReq2, testDeliverTxRes2) require.Nil(t, err) // write state changes @@ -268,7 +255,6 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) - expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey2, @@ -276,7 +262,6 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) - expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, @@ -295,6 +280,9 @@ func testListenBlock(t *testing.T) { err = testStreamingService.ListenCommit(emptyContextWrap, testCommitRes) require.Nil(t, err) + err = testStreamingService.ListenCommit(emptyContext, testCommitRes) + require.Nil(t, err) + // load the file, checking that it was created with the expected name metaFileName := fmt.Sprintf("%s-block-%d-meta", testPrefix, testBeginBlockReq.GetHeader().Height) dataFileName := fmt.Sprintf("%s-block-%d-data", testPrefix, testBeginBlockReq.GetHeader().Height) @@ -327,7 +315,15 @@ func testListenBlock(t *testing.T) { func readInFile(name string) ([]byte, error) { path := filepath.Join(testDir, name) - return os.ReadFile(path) + bz, err := os.ReadFile(path) + if err != nil { + return nil, err + } + size := sdk.BigEndianToUint64(bz[:8]) + if len(bz) != int(size)+8 { + return nil, errors.New("incomplete file ") + } + return bz[8:], nil } // segmentBytes returns all of the protobuf messages contained in the byte array diff --git a/store/types/listening.go b/store/types/listening.go index 6b2ec68d603d..e2e840883da5 100644 --- a/store/types/listening.go +++ b/store/types/listening.go @@ -64,7 +64,7 @@ func NewMemoryListener(key StoreKey) *MemoryListener { return &MemoryListener{key: key} } -// OnWrite implements WriteListener interface. +// OnWrite implements WriteListener interface func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error { fl.stateCache = append(fl.stateCache, StoreKVPair{ StoreKey: storeKey.Name(), @@ -72,19 +72,17 @@ func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, d Key: key, Value: value, }) - return nil } -// PopStateCache returns the current state caches and set to nil. +// PopStateCache returns the current state caches and set to nil func (fl *MemoryListener) PopStateCache() []StoreKVPair { res := fl.stateCache fl.stateCache = nil - return res } -// StoreKey returns the storeKey it listens to. +// StoreKey returns the storeKey it listens to func (fl *MemoryListener) StoreKey() StoreKey { return fl.key } diff --git a/store/types/store.go b/store/types/store.go index 58ffcba8192e..ee22cb7d1288 100644 --- a/store/types/store.go +++ b/store/types/store.go @@ -160,13 +160,6 @@ type MultiStore interface { // tracing operations. The modified MultiStore is returned. SetTracingContext(TraceContext) MultiStore - // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey - ListeningEnabled(key StoreKey) bool - - // AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey - // It appends the listeners to a current set, if one already exists - AddListeners(key StoreKey, listeners []WriteListener) - // LatestVersion returns the latest version in the store LatestVersion() int64 } @@ -229,6 +222,13 @@ type CommitMultiStore interface { // RollbackToVersion rollback the db to specific version(height). RollbackToVersion(version int64) error + + // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey + ListeningEnabled(key StoreKey) bool + + // AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey + // It appends the listeners to a current set, if one already exists + AddListeners(key StoreKey, listeners []WriteListener) } //---------subsp-------------------------------