Skip to content

Commit

Permalink
blockfetcher: add retry for GET and SEARCH operations
Browse files Browse the repository at this point in the history
Close #3564

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Nov 29, 2024
1 parent 9fa07d8 commit 92ff840
Showing 1 changed file with 65 additions and 8 deletions.
73 changes: 65 additions & 8 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ const (
defaultHealthcheckTimeout = 10 * time.Second
)

// Constants related to retry mechanism.
const (
// maxRetries is the maximum number of retries for a single operation.
maxRetries = 5
// initialBackoff is the initial backoff duration.
initialBackoff = 500 * time.Millisecond
// backoffFactor is the factor by which the backoff duration is multiplied.
backoffFactor = 2
// maxBackoff is the maximum backoff duration.
maxBackoff = 20 * time.Second
)

// Ledger is an interface to Blockchain sufficient for Service.
type Ledger interface {
GetConfig() config.Blockchain
Expand Down Expand Up @@ -215,7 +227,9 @@ func (bfs *Service) oidDownloader() {
}
var force bool
if err != nil {
bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err))
if !isContextCanceledErr(err) {
bfs.log.Error("NeoFS BlockFetcher service: OID downloading routine failed", zap.Error(err))
}
force = true
}
// Stop the service since there's nothing to do anymore.
Expand Down Expand Up @@ -489,21 +503,64 @@ func (bfs *Service) IsActive() bool {
return bfs.isActive.Load()
}

// retry function with exponential backoff.
func (bfs *Service) retry(action func() error) error {
var (
err error
backoff = initialBackoff
timer = time.NewTimer(0)
)
defer func() {
if !timer.Stop() {
<-timer.C
}
}()

for i := range maxRetries {
if err = action(); err == nil {
return nil
}
if i == maxRetries-1 {
break
}
timer.Reset(backoff)

select {
case <-timer.C:
case <-bfs.ctx.Done():
return bfs.ctx.Err()
}
backoff *= time.Duration(backoffFactor)
if backoff > maxBackoff {
backoff = maxBackoff
}
}
return err
}

func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid))
if err != nil {
return nil, err
}
rc, err := neofs.GetWithClient(ctx, bfs.pool, bfs.account.PrivateKey(), u, false)
if err != nil {
return nil, err
}

return rc, nil
var rc io.ReadCloser
err = bfs.retry(func() error {
rc, err = neofs.GetWithClient(ctx, bfs.pool, bfs.account.PrivateKey(), u, false)
return err
})
return rc, err
}

func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) {
return neofs.ObjectSearch(ctx, bfs.pool, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm)
var (
oids []oid.ID
err error
)
err = bfs.retry(func() error {
oids, err = neofs.ObjectSearch(ctx, bfs.pool, bfs.account.PrivateKey(), bfs.cfg.ContainerID, prm)
return err
})
return oids, err
}

// isContextCanceledErr returns whether error is a wrapped [context.Canceled].
Expand Down

0 comments on commit 92ff840

Please sign in to comment.