From 95982ce7cca105c127a87b9036aa14466251bda6 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Mon, 26 Aug 2024 17:31:02 +0300 Subject: [PATCH] services: add new service for fetching blocks from NeoFS Close #3496 Signed-off-by: Ekaterina Pavlova --- cli/server/dump_bin.go | 91 ++++ cli/server/dump_bin_test.go | 101 +++++ cli/server/server.go | 7 + config/protocol.testnet.yml | 16 + docs/node-configuration.md | 54 +++ go.mod | 2 +- pkg/config/application_config.go | 23 +- pkg/config/blockfetcher_config.go | 49 +++ pkg/config/config.go | 4 + pkg/network/server.go | 76 +++- pkg/network/server_config.go | 40 +- pkg/services/blockfetcher/blockfetcher.go | 406 ++++++++++++++++++ .../blockfetcher/blockfetcher_test.go | 75 ++++ pkg/services/oracle/neofs/neofs.go | 106 ++++- 14 files changed, 980 insertions(+), 70 deletions(-) create mode 100644 cli/server/dump_bin.go create mode 100644 cli/server/dump_bin_test.go create mode 100644 pkg/config/blockfetcher_config.go create mode 100644 pkg/services/blockfetcher/blockfetcher.go create mode 100644 pkg/services/blockfetcher/blockfetcher_test.go diff --git a/cli/server/dump_bin.go b/cli/server/dump_bin.go new file mode 100644 index 0000000000..e9e43487e0 --- /dev/null +++ b/cli/server/dump_bin.go @@ -0,0 +1,91 @@ +package server + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/nspcc-dev/neo-go/cli/cmdargs" + "github.com/nspcc-dev/neo-go/cli/options" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/urfave/cli/v2" +) + +func dumpBin(ctx *cli.Context) error { + if err := cmdargs.EnsureNone(ctx); err != nil { + return err + } + cfg, err := options.GetConfigFromContext(ctx) + if err != nil { + return cli.Exit(err, 1) + } + log, _, logCloser, err := options.HandleLoggingParams(ctx.Bool("debug"), cfg.ApplicationConfiguration) + if err != nil { + return cli.Exit(err, 1) + } + if logCloser != nil { + defer func() { _ = logCloser() }() + } + count := uint32(ctx.Uint("count")) + start := uint32(ctx.Uint("start")) + + chain, prometheus, pprof, err := initBCWithMetrics(cfg, log) + if err != nil { + return err + } + defer func() { + pprof.ShutDown() + prometheus.ShutDown() + chain.Close() + }() + + blocksCount := chain.BlockHeight() + 1 + if start+count > blocksCount { + return cli.Exit(fmt.Errorf("chain is not that high (%d) to dump %d blocks starting from %d", blocksCount-1, count, start), 1) + } + if count == 0 { + count = blocksCount - start + } + + out := ctx.String("out") + if out == "" { + return cli.Exit("output directory is not specified", 1) + } + if _, err = os.Stat(out); os.IsNotExist(err) { + if err = os.MkdirAll(out, os.ModePerm); err != nil { + return cli.Exit(fmt.Sprintf("failed to create directory %s: %s", out, err), 1) + } + } + if err != nil { + return cli.Exit(fmt.Sprintf("failed to check directory %s: %s", out, err), 1) + } + + for i := start; i < start+count; i++ { + bh := chain.GetHeaderHash(i) + blk, err := chain.GetBlock(bh) + if err != nil { + return cli.Exit(fmt.Sprintf("failed to get block %d: %s", i, err), 1) + } + filePath := filepath.Join(out, fmt.Sprintf("block-%d.bin", i)) + if err = saveBlockToFile(blk, filePath); err != nil { + return cli.Exit(fmt.Sprintf("failed to save block %d to file %s: %s", i, filePath, err), 1) + } + } + return nil +} + +func saveBlockToFile(blk *block.Block, filePath string) error { + file, err := os.Create(filePath) + if err != nil { + return err + } + defer file.Close() + + writer := io.NewBinWriterFromIO(file) + blk.EncodeBinary(writer) + if writer.Err != nil { + return writer.Err + } + return nil +} diff --git a/cli/server/dump_bin_test.go b/cli/server/dump_bin_test.go new file mode 100644 index 0000000000..5de271177f --- /dev/null +++ b/cli/server/dump_bin_test.go @@ -0,0 +1,101 @@ +package server_test + +import ( + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/nspcc-dev/neo-go/internal/testcli" + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/storage/dbconfig" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" +) + +func TestDumpBin(t *testing.T) { + tmpDir := t.TempDir() + + loadConfig := func(t *testing.T) config.Config { + chainPath := filepath.Join(tmpDir, "neogotestchain") + cfg, err := config.LoadFile(filepath.Join("..", "..", "config", "protocol.unit_testnet.yml")) + require.NoError(t, err, "could not load config") + cfg.ApplicationConfiguration.DBConfiguration.Type = dbconfig.LevelDB + cfg.ApplicationConfiguration.DBConfiguration.LevelDBOptions.DataDirectoryPath = chainPath + return cfg + } + + cfg := loadConfig(t) + out, err := yaml.Marshal(cfg) + require.NoError(t, err) + + cfgPath := filepath.Join(tmpDir, "protocol.unit_testnet.yml") + require.NoError(t, os.WriteFile(cfgPath, out, os.ModePerm)) + + e := testcli.NewExecutor(t, false) + + restoreArgs := []string{"neo-go", "db", "restore", + "--config-file", cfgPath, "--in", inDump} + e.Run(t, restoreArgs...) + + t.Run("missing output directory", func(t *testing.T) { + args := []string{"neo-go", "db", "dump-bin", + "--config-file", cfgPath, "--out", ""} + e.RunWithErrorCheck(t, "output directory is not specified", args...) + }) + + t.Run("successful dump", func(t *testing.T) { + outDir := filepath.Join(tmpDir, "blocks") + args := []string{"neo-go", "db", "dump-bin", + "--config-file", cfgPath, "--out", outDir, "--count", "5", "--start", "0"} + + e.Run(t, args...) + + require.DirExists(t, outDir) + + for i := 0; i < 5; i++ { + blockFile := filepath.Join(outDir, "block-"+strconv.Itoa(i)+".bin") + require.FileExists(t, blockFile) + } + }) + + t.Run("invalid block range", func(t *testing.T) { + outDir := filepath.Join(tmpDir, "invalid-blocks") + args := []string{"neo-go", "db", "dump-bin", + "--config-file", cfgPath, "--out", outDir, "--count", "1000", "--start", "0"} + + e.RunWithError(t, args...) + }) + + t.Run("output directory with no write permission", func(t *testing.T) { + outDir := filepath.Join(tmpDir, "no-write-permission") + require.NoError(t, os.Mkdir(outDir, 0400)) + + args := []string{"neo-go", "db", "dump-bin", + "--config-file", cfgPath, "--out", outDir, "--count", "5", "--start", "0"} + + e.RunWithError(t, args...) + }) + + t.Run("zero blocks (full chain dump)", func(t *testing.T) { + outDir := filepath.Join(tmpDir, "full-dump") + args := []string{"neo-go", "db", "dump-bin", + "--config-file", cfgPath, "--out", outDir} + + e.Run(t, args...) + + require.DirExists(t, outDir) + for i := 0; i < 50; i++ { + blockFile := filepath.Join(outDir, "block-"+strconv.Itoa(i)+".bin") + require.FileExists(t, blockFile) + } + }) + + t.Run("invalid config file", func(t *testing.T) { + outDir := filepath.Join(tmpDir, "blocks") + args := []string{"neo-go", "db", "dump-bin", + "--config-file", "invalid-config-path", "--out", outDir} + + e.RunWithError(t, args...) + }) +} diff --git a/cli/server/server.go b/cli/server/server.go index 67787ced9b..fe8394d19a 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -105,6 +105,13 @@ func NewCommands() []*cli.Command { Action: dumpDB, Flags: cfgCountOutFlags, }, + { + Name: "dump-bin", + Usage: "Dump blocks (starting with the genesis or specified block) to the directory in binary format", + UsageText: "neo-go db dump-bin -o directory [-s start] [-c count] [--config-path path] [-p/-m/-t] [--config-file file]", + Action: dumpBin, + Flags: cfgCountOutFlags, + }, { Name: "restore", Usage: "Restore blocks from the file", diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index bdab3cd20d..85e35bcd00 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -100,3 +100,19 @@ ApplicationConfiguration: Enabled: false Addresses: - ":2113" +# NeoFSBlockFetcher: +# Enabled: true +# UnlockWallet: +# Path: "./testnet_wallet.json" +# Password: "111" +# Addresses: +# - st1.t5.fs.neo.org:8080 +# Timeout: 30s +# ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG" +# Mode: "oidSearch" # other options: 'indexSearch', 'oidSearch' +# BatchSize: 1000 +# BlockAttribute: "blocks_index_1" +# OidAttribute: "block_oids_1" +# HeaderAttribute: "index_header" + + diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 83c149e620..35c04dfa4f 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -21,6 +21,7 @@ node-related settings described in the table below. | GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. | | KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store the latest state (or a set of latest states, see `P2PStateExchangeExtensions` section in the ProtocolConfiguration for details). If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | | | LogPath | `string` | "", so only console logging | File path where to store node logs. | +| NeoFSBlockFetcher | [NeoFSBlockFetcher Configuration](#NeoFSBlockFetcher-Configuration) | | NeoFSBlockFetcher module configuration. See the [NeoFSBlockFetcher Configuration](#Oracle-Configuration) section for details. | | Oracle | [Oracle Configuration](#Oracle-Configuration) | | Oracle module configuration. See the [Oracle Configuration](#Oracle-Configuration) section for details. | | P2P | [P2P Configuration](#P2P-Configuration) | | Configuration values for P2P network interaction. See the [P2P Configuration](#P2P-Configuration) section for details. | | P2PNotary | [P2P Notary Configuration](#P2P-Notary-Configuration) | | P2P Notary module configuration. See the [P2P Notary Configuration](#P2P-Notary-Configuration) section for details. | @@ -323,6 +324,59 @@ where: - `Path` is a path to wallet. - `Password` is a wallet password. +### NeoFSBlockFetcher Configuration +NeoFSBlockFetcher: +Enabled: true +UnlockWallet: +Path: "./testnet_wallet.json" +Password: "111" +Addresses: +- st1.t5.fs.neo.org:8080 +Timeout: 30s +ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG" +Mode: "oidSearch" # other options: 'indexSearch', 'oidSearch' +BatchSize: 1000 +BlockAttribute: "blocks_index_1" +OidAttribute: "block_oids_1" +HeaderAttribute: "index_header" + + +### NeoFSBlockFetcher Configuration +`NeoFSBlockFetcher` configuration section contains settings for NeoFS block fetcher +module and has the following structure: +``` +NeoFSBlockFetcher: + Enabled: false + UnlockWallet: + Path: "./wallet.json" + Password: "pass" + Addresses: + - st1.t5.fs.neo.org:8080 + Timeout: 30s + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG" + Mode: "oidSearch" + BatchSize: 1000 + BlockAttribute: "blocks_index_1" + OidAttribute: "block_oids_1" + HeaderAttribute: "index_header" +``` +where: +- `Enabled` enables NeoFS block fetcher module. +- `UnlockWallet` contains wallet settings, see + [Unlock Wallet Configuration](#Unlock-Wallet-Configuration) section for + structure details. +- `Addresses` is a list of NeoFS storage nodes addresses. +- `Timeout` is a timeout for NeoFS storage nodes requests. +- `ContainerID` is a container ID to fetch blocks from. +- `Mode` is a mode of fetching blocks from NeoFS storage nodes. Available options: + - `oidSearch` - fetch blocks by their OIDs. + - `indexSearch` - fetch blocks by their indexes. +- `BatchSize` is a number of blocks to fetch in a single request. +- `BlockAttribute` is an attribute name in the container that contains blocks. +- `OidAttribute` is an attribute name in the container that contains block OIDs. +- `HeaderAttribute` is an attribute name in the container that contains block headers. + + ## Protocol Configuration `ProtocolConfiguration` section of `yaml` node configuration file contains diff --git a/go.mod b/go.mod index 73d0953100..a6d73c5ee3 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( golang.org/x/term v0.18.0 golang.org/x/text v0.14.0 golang.org/x/tools v0.19.0 + google.golang.org/grpc v1.62.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -67,7 +68,6 @@ require ( golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect - google.golang.org/grpc v1.62.0 // indirect google.golang.org/protobuf v1.33.0 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 2e94961d7b..604d3a40e6 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -23,12 +23,13 @@ type ApplicationConfiguration struct { Pprof BasicService `yaml:"Pprof"` Prometheus BasicService `yaml:"Prometheus"` - Relay bool `yaml:"Relay"` - Consensus Consensus `yaml:"Consensus"` - RPC RPC `yaml:"RPC"` - Oracle OracleConfiguration `yaml:"Oracle"` - P2PNotary P2PNotary `yaml:"P2PNotary"` - StateRoot StateRoot `yaml:"StateRoot"` + Relay bool `yaml:"Relay"` + Consensus Consensus `yaml:"Consensus"` + RPC RPC `yaml:"RPC"` + Oracle OracleConfiguration `yaml:"Oracle"` + P2PNotary P2PNotary `yaml:"P2PNotary"` + StateRoot StateRoot `yaml:"StateRoot"` + NeoFSBlockFetcher NeoFSBlockFetcher `yaml:"NeoFSBlockFetcher"` } // EqualsButServices returns true when the o is the same as a except for services @@ -145,3 +146,13 @@ func (a *ApplicationConfiguration) GetAddresses() ([]AnnounceableAddress, error) } return addrs, nil } + +// Validate checks ApplicationConfiguration for internal consistency and returns +// an error if any invalid settings are found. This ensures that the application +// configuration is valid and safe to use for further operations. +func (a *ApplicationConfiguration) Validate() error { + if err := a.NeoFSBlockFetcher.Validate(); err != nil { + return fmt.Errorf("failed to validate NeoFSBlockFetcher section: %w", err) + } + return nil +} diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go new file mode 100644 index 0000000000..325b087d9b --- /dev/null +++ b/pkg/config/blockfetcher_config.go @@ -0,0 +1,49 @@ +package config + +import ( + "errors" + "fmt" + "time" + + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" +) + +// NeoFSBlockFetcher represents the configuration for the NeoFS block fetcher service. +type NeoFSBlockFetcher struct { + InternalService `yaml:",inline"` + Timeout time.Duration `yaml:"Timeout"` + ContainerID string `yaml:"ContainerID"` + Mode string `yaml:"Mode"` + Addresses []string `yaml:"Addresses"` + BatchSize int `yaml:"BatchSize"` + BlockAttribute string `yaml:"BlockAttribute"` + OidAttribute string `yaml:"OidAttribute"` + HeaderAttribute string `yaml:"HeaderAttribute"` +} + +// Validate checks NeoFSBlockFetcher for internal consistency and ensures +// that all required fields are properly set. It returns an error if the +// configuration is invalid or if the ContainerID cannot be properly decoded. +func (cfg *NeoFSBlockFetcher) Validate() error { + if !cfg.Enabled { + return nil + } + if cfg.ContainerID == "" { + return errors.New("container ID is not set") + } + var containerID cid.ID + err := containerID.DecodeString(cfg.ContainerID) + if err != nil { + return fmt.Errorf("invalid container ID: %w", err) + } + if cfg.Timeout == 0 { + cfg.Timeout = 10 * time.Second + } + if cfg.BatchSize == 0 { + cfg.BatchSize = 50 + } + if cfg.Mode == "" { + cfg.Mode = "indexSearch" + } + return nil +} diff --git a/pkg/config/config.go b/pkg/config/config.go index a82838ec21..0278614333 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -116,6 +116,10 @@ func LoadFile(configPath string, relativePath ...string) (Config, error) { if err != nil { return Config{}, err } + err = config.ApplicationConfiguration.Validate() + if err != nil { + return Config{}, err + } return config, nil } diff --git a/pkg/network/server.go b/pkg/network/server.go index 34da851b52..e997c26544 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -28,6 +28,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -103,10 +104,12 @@ type ( chain Ledger bQueue *bqueue.Queue bSyncQueue *bqueue.Queue + bFetcherQueue *bqueue.Queue mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + blockFetcher *blockfetcher.Service serviceLock sync.RWMutex services map[string]Service @@ -133,6 +136,7 @@ type ( runFin chan struct{} broadcastTxFin chan struct{} runProtoFin chan struct{} + blockFetcherFin chan struct{} transactions chan *transaction.Transaction @@ -182,28 +186,29 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy } s := &Server{ - ServerConfig: config, - chain: chain, - id: randomID(), - config: chain.GetConfig().ProtocolConfiguration, - quit: make(chan struct{}), - relayFin: make(chan struct{}), - runFin: make(chan struct{}), - broadcastTxFin: make(chan struct{}), - runProtoFin: make(chan struct{}), - register: make(chan Peer), - unregister: make(chan peerDrop), - handshake: make(chan Peer), - txInMap: make(map[util.Uint256]struct{}), - peers: make(map[Peer]bool), - mempool: chain.GetMemPool(), - extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), - log: log, - txin: make(chan *transaction.Transaction, 64), - transactions: make(chan *transaction.Transaction, 64), - services: make(map[string]Service), - extensHandlers: make(map[string]func(*payload.Extensible) error), - stateSync: stSync, + ServerConfig: config, + chain: chain, + id: randomID(), + config: chain.GetConfig().ProtocolConfiguration, + quit: make(chan struct{}), + relayFin: make(chan struct{}), + runFin: make(chan struct{}), + broadcastTxFin: make(chan struct{}), + runProtoFin: make(chan struct{}), + blockFetcherFin: make(chan struct{}, 1), + register: make(chan Peer), + unregister: make(chan peerDrop), + handshake: make(chan Peer), + txInMap: make(map[util.Uint256]struct{}), + peers: make(map[Peer]bool), + mempool: chain.GetMemPool(), + extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), + log: log, + txin: make(chan *transaction.Transaction, 64), + transactions: make(chan *transaction.Transaction, 64), + services: make(map[string]Service), + extensHandlers: make(map[string]func(*payload.Extensible) error), + stateSync: stSync, } if chain.P2PSigExtensionsEnabled() { s.notaryFeer = NewNotaryFeer(chain) @@ -220,6 +225,11 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric) + s.bFetcherQueue = bqueue.New(chain, log, nil, updateBlockQueueLenMetric) + s.blockFetcher = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, s.bFetcherQueue, log, func() { + close(s.blockFetcherFin) + }) + if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", zap.Int("configured", s.MinPeers), @@ -295,6 +305,13 @@ func (s *Server) Start() { go s.relayBlocksLoop() go s.bQueue.Run() go s.bSyncQueue.Run() + go s.bFetcherQueue.Run() + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + err := s.blockFetcher.Start() + if err != nil { + s.blockFetcher.Shutdown() + } + } for _, tr := range s.transports { go tr.Accept() } @@ -319,6 +336,7 @@ func (s *Server) Shutdown() { } s.bQueue.Discard() s.bSyncQueue.Discard() + s.bFetcherQueue.Discard() s.serviceLock.RLock() for _, svc := range s.services { svc.Shutdown() @@ -550,6 +568,9 @@ func (s *Server) run() { s.discovery.RegisterGood(p) + s.tryInitStateSync() + s.tryStartServices() + case <-s.blockFetcherFin: s.tryInitStateSync() s.tryStartServices() } @@ -706,7 +727,7 @@ func (s *Server) IsInSync() bool { var peersNumber int var notHigher int - if s.stateSync.IsActive() { + if s.stateSync.IsActive() || s.blockFetcher.IsActive() { return false } @@ -766,6 +787,9 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // handleBlockCmd processes the block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { + if s.blockFetcher.IsActive() { + return nil + } if s.stateSync.IsActive() { return s.bSyncQueue.PutBlock(block) } @@ -786,6 +810,9 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error { } func (s *Server) requestBlocksOrHeaders(p Peer) error { + if s.blockFetcher.IsActive() { + return nil + } if s.stateSync.NeedHeaders() { if s.chain.HeaderHeight() < p.LastBlockIndex() { return s.requestHeaders(p) @@ -1434,6 +1461,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } func (s *Server) tryInitStateSync() { + if s.blockFetcher.IsActive() { + return + } if !s.stateSync.IsActive() { s.bSyncQueue.Discard() return diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index c0f1e727f6..2e3fb141a4 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -76,6 +76,9 @@ type ( // BroadcastFactor is the factor (0-100) for fan-out optimization. BroadcastFactor int + + // NeoFSBlockFetcherCfg is the configuration for the blockfetcher service. + NeoFSBlockFetcherCfg config.NeoFSBlockFetcher } ) @@ -89,24 +92,25 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) { return ServerConfig{}, fmt.Errorf("failed to parse addresses: %w", err) } c := ServerConfig{ - UserAgent: cfg.GenerateUserAgent(), - Addresses: addrs, - Net: protoConfig.Magic, - Relay: appConfig.Relay, - Seeds: protoConfig.SeedList, - DialTimeout: appConfig.P2P.DialTimeout, - ProtoTickInterval: appConfig.P2P.ProtoTickInterval, - PingInterval: appConfig.P2P.PingInterval, - PingTimeout: appConfig.P2P.PingTimeout, - MaxPeers: appConfig.P2P.MaxPeers, - AttemptConnPeers: appConfig.P2P.AttemptConnPeers, - MinPeers: appConfig.P2P.MinPeers, - TimePerBlock: protoConfig.TimePerBlock, - OracleCfg: appConfig.Oracle, - P2PNotaryCfg: appConfig.P2PNotary, - StateRootCfg: appConfig.StateRoot, - ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, - BroadcastFactor: appConfig.P2P.BroadcastFactor, + UserAgent: cfg.GenerateUserAgent(), + Addresses: addrs, + Net: protoConfig.Magic, + Relay: appConfig.Relay, + Seeds: protoConfig.SeedList, + DialTimeout: appConfig.P2P.DialTimeout, + ProtoTickInterval: appConfig.P2P.ProtoTickInterval, + PingInterval: appConfig.P2P.PingInterval, + PingTimeout: appConfig.P2P.PingTimeout, + MaxPeers: appConfig.P2P.MaxPeers, + AttemptConnPeers: appConfig.P2P.AttemptConnPeers, + MinPeers: appConfig.P2P.MinPeers, + TimePerBlock: protoConfig.TimePerBlock, + OracleCfg: appConfig.Oracle, + P2PNotaryCfg: appConfig.P2PNotary, + StateRootCfg: appConfig.StateRoot, + ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, + BroadcastFactor: appConfig.P2P.BroadcastFactor, + NeoFSBlockFetcherCfg: appConfig.NeoFSBlockFetcher, } return c, nil } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go new file mode 100644 index 0000000000..9b8cb464c5 --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -0,0 +1,406 @@ +package blockfetcher + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "io" + "net/url" + "sync" + "sync/atomic" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/block" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/bqueue" + "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +const ( + ModeIndexSearch = "indexSearch" + ModeOidSearch = "oidSearch" + + oidSize = sha256.Size + oidBatchSize = 2000 +) + +// Ledger is an interface to Blockchain sufficient for Service. +type Ledger interface { + AddBlock(block *block.Block) error + GetConfig() config.Blockchain + BlockHeight() uint32 + AddHeaders(...*block.Header) error + HeaderHeight() uint32 +} + +// Service is a service that fetches blocks from NeoFS. +type Service struct { + chain Ledger + client *client.Client + log *zap.Logger + quit chan bool + cfg config.NeoFSBlockFetcher + queue *bqueue.Queue + account *wallet.Account + started atomic.Bool + stateRootInHeader bool + shutdownCallback func() +} + +// New creates a new BlockFetcherService. +func New(chain Ledger, cfg config.NeoFSBlockFetcher, queue *bqueue.Queue, logger *zap.Logger, shutdownCallback func()) *Service { + var ( + account *wallet.Account + err error + ) + + if cfg.UnlockWallet.Path != "" { + walletFromFile, err := wallet.NewWalletFromFile(cfg.UnlockWallet.Path) + if err != nil { + logger.Error("BlockFetcher failed to load wallet from file", zap.Error(err)) + } + for _, acc := range walletFromFile.Accounts { + if err := acc.Decrypt(cfg.UnlockWallet.Password, walletFromFile.Scrypt); err == nil { + account = acc + break + } + } + if account == nil { + logger.Error("BlockFetcher failed to decrypt any account in the wallet") + return nil + } + } else { + account, err = wallet.NewAccount() + if err != nil { + logger.Error("BlockFetcher failed to create new account", zap.Error(err)) + return nil + } + } + return &Service{ + chain: chain, + log: logger, + quit: make(chan bool), + cfg: cfg, + queue: queue, + account: account, + stateRootInHeader: chain.GetConfig().StateRootInHeader, + shutdownCallback: shutdownCallback, + } +} + +// Start runs the block fetcher service. +func (bfs *Service) Start() error { + if !bfs.started.CompareAndSwap(false, true) { + return errors.New("NeoFS block fetcher service is already running") + } + bfs.log.Info("starting NeoFS block fetcher service") + var err error + bfs.client, err = neofs.GetSDKClient(context.Background(), bfs.cfg.Addresses[0]) + if err != nil { + bfs.log.Error("NeoFS block fetcher service: failed to create SDK client", + zap.String("address", bfs.cfg.Addresses[0]), zap.Error(err)) + return err + } + go bfs.start() + return nil +} + +func (bfs *Service) start() { + defer bfs.Shutdown() + var err error + switch bfs.cfg.Mode { + case ModeIndexSearch: + err = bfs.fetchData() + case ModeOidSearch: + err = bfs.fetchDataWithOid() + default: + bfs.log.Error("NeoFS block fetcher service: invalid mode specified", zap.String("mode", bfs.cfg.Mode)) + return + } + + if err != nil { + bfs.log.Error("NeoFS block fetcher service: fetch operation failed", zap.Error(err)) + return + } +} + +// Shutdown stops the block fetcher service. +func (bfs *Service) Shutdown() { + if !bfs.started.CompareAndSwap(true, false) { + return + } + bfs.log.Info("shutting down NeoFS block fetcher service") + bfs.client.Close() + close(bfs.quit) + bfs.shutdownCallback() + _ = bfs.log.Sync() +} + +// IsActive returns true if the block fetcher service is running. +func (bfs *Service) IsActive() bool { + return bfs.started.Load() +} + +func (bfs *Service) fetchData() error { + startIndex := bfs.chain.BlockHeight() + batchSize := uint32(bfs.cfg.BatchSize) + + for { + select { + case <-bfs.quit: + bfs.log.Info("stopping NeoFS data fetching operation.") + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE) + prm.SetFilters(filters) + ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + defer cancel() + blockOids, err := bfs.search(ctx, prm) + bfs.log.Info(fmt.Sprintf("NeoFS block fetcher service: found %d blocks from index %d to %d", len(blockOids), startIndex, startIndex+batchSize-1)) + if err != nil { + bfs.log.Error(fmt.Sprintf("NeoFS block fetcher service: failed to find %s object with index from %d to %d", bfs.cfg.BlockAttribute, startIndex, startIndex+batchSize-1), zap.Error(err)) + return err + } + + if len(blockOids) == 0 { + bfs.log.Info(fmt.Sprintf("NeoFS block fetcher service: no block found with index %d, stopping.", startIndex)) + return nil + } + err = bfs.fetchAndProcessBlocks(blockOids) + if err != nil { + return err + } + startIndex += batchSize + } + } +} + +func (bfs *Service) fetchAndProcessBlocks(blockOids []oid.ID) error { + var ( + wg sync.WaitGroup + lastError atomic.Value + ) + + for _, oidBlock := range blockOids { + wg.Add(1) + go func(oidBlock oid.ID) { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + defer cancel() + rc, err := bfs.get(ctx, oidBlock.String()) + if err != nil { + lastError.Store(err.Error()) + return + } + err = bfs.processBlock(rc) + if err != nil { + lastError.Store(err.Error()) + return + } + }(oidBlock) + } + wg.Wait() + if loadedErr := lastError.Load(); loadedErr != nil { + if errStr, ok := loadedErr.(string); ok && errStr != "" { + return fmt.Errorf(errStr) + } + } + return nil +} + +func (bfs *Service) fetchDataWithOid() error { + startIndex := bfs.chain.BlockHeight()/oidBatchSize + 1 + skip := bfs.chain.BlockHeight() % oidBatchSize + + for { + select { + case <-bfs.quit: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.OidAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual) + prm.SetFilters(filters) + + ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + blockOidsObject, err := bfs.search(ctx, prm) + cancel() + if err != nil { + return fmt.Errorf("failed to find '%s' object with index %d: %w", bfs.cfg.OidAttribute, startIndex, err) + } + if len(blockOidsObject) == 0 { + bfs.log.Info(fmt.Sprintf("NeoFS block fetcher service: no '%s' object found with index %d, stopping.", bfs.cfg.OidAttribute, startIndex)) + return nil + } + + blockCtx, blockCancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + defer blockCancel() + blockOidsData, err := bfs.get(blockCtx, blockOidsObject[0].String()) + if err != nil { + return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.OidAttribute, startIndex, err) + } + blockOIDs, err := parseBlockOIDs(blockOidsData) + if err != nil { + return fmt.Errorf("failed to parse '%s' object with index %d: %w", bfs.cfg.OidAttribute, startIndex, err) + } + + if len(blockOIDs) == 0 { + return fmt.Errorf("no blocks found with index %d, stopping", startIndex) + } + + blockOIDs = blockOIDs[skip:] + + for len(blockOIDs) > 0 { + batch := blockOIDs + if len(blockOIDs) > bfs.cfg.BatchSize { + batch = blockOIDs[:bfs.cfg.BatchSize] + blockOIDs = blockOIDs[bfs.cfg.BatchSize:] + } else { + blockOIDs = nil + } + + select { + case <-bfs.quit: + return nil + default: + err := bfs.fetchAndProcessBlocks(batch) + if err != nil { + return err + } + } + } + + startIndex++ + skip = 0 + } + } +} + +func (bfs *Service) processBlock(rc io.ReadCloser) error { + b := block.New(bfs.stateRootInHeader) + r := gio.NewBinReaderFromIO(rc) + b.DecodeBinary(r) + rc.Close() + if r.Err != nil { + return r.Err + } + if b.Index%1000 == 0 { + bfs.log.Info(fmt.Sprintf("NeoFS block fetcher service: processing block index: %d", b.Index)) + } + err := bfs.queue.PutBlock(b) + if err != nil { + return err + } + return nil +} + +// GetHeaders fetches headers from NeoFS. +func (bfs *Service) GetHeaders() error { + startIndex := bfs.chain.HeaderHeight() / 2000 + + for { + select { + case <-bfs.quit: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(bfs.cfg.HeaderAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE) + filters.AddFilter(bfs.cfg.HeaderAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumLE) + prm.SetFilters(filters) + + ctx, cancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + + headerOids, err := bfs.search(ctx, prm) + cancel() + if err != nil { + bfs.log.Error(fmt.Sprintf("NeoFS block fetcher service: failed to find %s object with index %d", bfs.cfg.HeaderAttribute, startIndex), zap.Error(err)) + return err + } + + if len(headerOids) == 0 { + bfs.log.Info(fmt.Sprintf("NeoFS block fetcher service: no %s object found with index %d, stopping.", bfs.cfg.HeaderAttribute, startIndex)) + return fmt.Errorf("NeoFS block fetcher service: no %s object found", bfs.cfg.OidAttribute) + } + + headerCtx, headerCancel := context.WithTimeout(context.Background(), bfs.cfg.Timeout) + defer headerCancel() + rc, err := bfs.get(headerCtx, headerOids[0].String()) + if err != nil { + bfs.log.Error(fmt.Sprintf("NeoFS block fetcher service: failed to fetch %s object with index %d", bfs.cfg.OidAttribute, startIndex), zap.Error(err)) + return err + } + + err = bfs.processHeaders(rc) + if err != nil { + bfs.log.Error(fmt.Sprintf("NeoFS block fetcher service: failed to process headers for index %d", startIndex), zap.Error(err)) + return err + } + startIndex++ + } + } +} + +func (bfs *Service) processHeaders(rc io.ReadCloser) error { + defer rc.Close() + var resHeader payload.Headers + br := gio.NewBinReaderFromIO(rc) + resHeader.DecodeBinary(br) + err := bfs.chain.AddHeaders(resHeader.Hdrs...) + if err != nil { + return err + } + return nil +} + +func (bfs *Service) get(ctx context.Context, oid string) (io.ReadCloser, error) { + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid)) + if err != nil { + return nil, err + } + rc, err := neofs.GetWithClient(ctx, bfs.client, bfs.account.PrivateKey(), u, false) + if err != nil { + return nil, err + } + + return rc, nil +} + +func (bfs *Service) search(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) { + return neofs.ObjectSearch(ctx, bfs.client, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm) +} + +func parseBlockOIDs(rc io.ReadCloser) ([]oid.ID, error) { + defer rc.Close() + data, err := io.ReadAll(rc) + if err != nil { + return nil, fmt.Errorf("NeoFS block fetcher service: failed to read data: %w", err) + } + oids := make([]oid.ID, 0, oidBatchSize) + + if len(data)%oidSize != 0 { + return nil, fmt.Errorf("NeoFS block fetcher service: invalid data length: not a multiple of oid size") + } + + for i := 0; i < len(data); i += oidSize { + oidBytes := data[i : i+oidSize] + var oidBlock oid.ID + err := oidBlock.Decode(oidBytes) + if err != nil { + return nil, fmt.Errorf("NeoFS block fetcher service: failed to decode OID: %w", err) + } + oids = append(oids, oidBlock) + } + + return oids, nil +} diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go new file mode 100644 index 0000000000..f7c048885d --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -0,0 +1,75 @@ +package blockfetcher_test + +import ( + "context" + "fmt" + "io" + "net/url" + "testing" + "time" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/stretchr/testify/require" +) + +func TestService(t *testing.T) { + cfg := config.NeoFSBlockFetcher{ + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", + InternalService: config.InternalService{ + Enabled: true, + UnlockWallet: config.Wallet{Password: "one", Path: "./"}, + }, + Addresses: []string{"st1.t5.fs.neo.org:8080"}, + Timeout: 15 * time.Second, + } + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + + filters.AddFilter("index_block", fmt.Sprintf("%d", 0), object.MatchStringEqual) + filters.AddFilter("index_block", fmt.Sprintf("%d", 0), object.MatchNumLE) + prm.SetFilters(filters) + + privateKey, err := keys.NewPrivateKey() + require.NoError(t, err) + + var containerID cid.ID + err = containerID.DecodeString(cfg.ContainerID) + require.NoError(t, err) + var ( + s = user.NewAutoIDSignerRFC6979(privateKey.PrivateKey) + objectIDs []oid.ID + ) + ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout) + defer cancel() + + neofsClient, err := neofs.GetSDKClient(ctx, + cfg.Addresses[0]) + require.NoError(t, err) + + reader, err := neofsClient.ObjectSearchInit(ctx, containerID, s, prm) + require.NoError(t, err) + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + require.NoError(t, err) + fmt.Println(objectIDs) + + oid := "3uHQb3SYPEhoxJigTtRALwhiha3nCzL7GsN6PGYMjwhT" + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", containerID, oid)) + require.NoError(t, err) + rc, err := neofs.GetWithClient(ctx, neofsClient, privateKey, u, false) + require.NoError(t, err) + data, err := io.ReadAll(rc) + require.NoError(t, err) + fmt.Println(data) +} diff --git a/pkg/services/oracle/neofs/neofs.go b/pkg/services/oracle/neofs/neofs.go index 27351ba8b2..57153e3da4 100644 --- a/pkg/services/oracle/neofs/neofs.go +++ b/pkg/services/oracle/neofs/neofs.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" @@ -17,6 +18,8 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -45,41 +48,47 @@ var ( // URI scheme is "neofs://". // If Command is not provided, full object is requested. func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) { - objectAddr, ps, err := parseNeoFSURL(u) + c, err := GetSDKClient(ctx, addr) if err != nil { - return nil, err + return clientCloseWrapper{c: c}, fmt.Errorf("failed to create client: %w", err) } + return GetWithClient(ctx, c, priv, u, true) +} - c, err := client.New(client.PrmInit{}) +// GetWithClient returns a neofs object from the provided url using the provided client. +// URI scheme is "neofs://". +// If Command is not provided, full object is requested. +func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, wrapClientCloser bool) (io.ReadCloser, error) { + objectAddr, ps, err := parseNeoFSURL(u) if err != nil { - return nil, fmt.Errorf("failed to create client: %w", err) + return nil, err } - var ( - res = clientCloseWrapper{c: c} - prmd client.PrmDial + result io.ReadCloser + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) ) - prmd.SetServerURI(addr) - prmd.SetContext(ctx) - err = c.Dial(prmd) //nolint:contextcheck // contextcheck: Function `Dial->Balance->SendUnary->Init->setNeoFSAPIServer` should pass the context parameter - if err != nil { - return res, err - } - - var s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) switch { - case len(ps) == 0 || ps[0] == "": // Get request - res.ReadCloser, err = getPayload(ctx, s, c, objectAddr) + case len(ps) == 0 || ps[0] == "": + result, err = getPayload(ctx, s, c, objectAddr) case ps[0] == rangeCmd: - res.ReadCloser, err = getRange(ctx, s, c, objectAddr, ps[1:]...) + result, err = getRange(ctx, s, c, objectAddr, ps[1:]...) case ps[0] == headerCmd: - res.ReadCloser, err = getHeader(ctx, s, c, objectAddr) + result, err = getHeader(ctx, s, c, objectAddr) case ps[0] == hashCmd: - res.ReadCloser, err = getHash(ctx, s, c, objectAddr, ps[1:]...) + result, err = getHash(ctx, s, c, objectAddr, ps[1:]...) default: - err = ErrInvalidCommand + return nil, ErrInvalidCommand + } + if err != nil { + return nil, err } - return res, err + if wrapClientCloser { + return clientCloseWrapper{ + c: c, + ReadCloser: result, + }, nil + } + return result, nil } type clientCloseWrapper struct { @@ -220,3 +229,56 @@ func parseRange(s string) (*object.Range, error) { r.SetLength(length) return r, nil } + +// ObjectSearch returns a list of object IDs from the provided container. +func ObjectSearch(ctx context.Context, c *client.Client, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) { + var ( + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + objectIDs []oid.ID + containerID cid.ID + ) + err := containerID.DecodeString(containerIDStr) + if err != nil { + return nil, fmt.Errorf("%w: %w", ErrInvalidContainer, err) + } + reader, err := c.ObjectSearchInit(ctx, containerID, s, prm) + if err != nil { + return nil, fmt.Errorf("failed to initiate object search: %w", err) + } + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + if err != nil { + return nil, fmt.Errorf("error during object IDs iteration: %w", err) + } + return objectIDs, nil +} + +// GetSDKClient returns a NeoFS SDK client configured with the specified address and context. +func GetSDKClient(ctx context.Context, addr string) (*client.Client, error) { + var ( + prmDial client.PrmDial + ) + + prmDial.SetServerURI(addr) + prmDial.SetContext(ctx) + prmDial.SetTimeout(10 * time.Minute) + prmDial.SetStreamTimeout(10 * time.Minute) + + c, err := client.New(client.PrmInit{}) + if err != nil { + return nil, fmt.Errorf("can't create SDK client: %w", err) + } + + if err := c.Dial(prmDial); err != nil { + if status.Code(err) == codes.Unimplemented { + return c, nil + } + return nil, fmt.Errorf("can't init SDK client: %w", err) + } + + return c, nil +}