Skip to content

Commit

Permalink
chore: optimize unhandled file close error (#2599)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored Aug 8, 2023
1 parent 9446250 commit 342438e
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 74 deletions.
56 changes: 35 additions & 21 deletions client/daemon/objectstorage/objectstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package objectstorage
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -585,37 +586,49 @@ func (o *objectStorage) copyObject(ctx *gin.Context) {
}

// getAvailableSeedPeer uses to calculate md5 with file header.
func (o *objectStorage) md5FromFileHeader(fileHeader *multipart.FileHeader) *digest.Digest {
func (o *objectStorage) md5FromFileHeader(fileHeader *multipart.FileHeader) (dig *digest.Digest) {
f, err := fileHeader.Open()
if err != nil {
return nil
}
defer f.Close()
defer func() {
errClose := f.Close()
if errClose != nil {
dig = nil
}
}()

return digest.New(digest.AlgorithmMD5, digest.MD5FromReader(f))
}

// importObjectToBackend uses to import object to backend.
func (o *objectStorage) importObjectToBackend(ctx context.Context, bucketName, objectKey string, dgst *digest.Digest, fileHeader *multipart.FileHeader, client objectstorage.ObjectStorage) error {
func (o *objectStorage) importObjectToBackend(ctx context.Context, bucketName, objectKey string, dgst *digest.Digest, fileHeader *multipart.FileHeader, client objectstorage.ObjectStorage) (err error) {
f, err := fileHeader.Open()
if err != nil {
return err
}
defer f.Close()
defer func() {
errClose := f.Close()
if errClose != nil {
err = errors.Join(err, errClose)
}
}()

if err := client.PutObject(ctx, bucketName, objectKey, dgst.String(), f); err != nil {
return err
}
return nil
return client.PutObject(ctx, bucketName, objectKey, dgst.String(), f)
}

// importObjectToSeedPeers uses to import object to local storage.
func (o *objectStorage) importObjectToLocalStorage(ctx context.Context, taskID, peerID string, fileHeader *multipart.FileHeader) error {
func (o *objectStorage) importObjectToLocalStorage(ctx context.Context, taskID, peerID string, fileHeader *multipart.FileHeader) (err error) {
f, err := fileHeader.Open()
if err != nil {
return nil
}
defer f.Close()
defer func() {
errClose := f.Close()
if errClose != nil {
err = errors.Join(err, errClose)
}
}()

meta := storage.PeerTaskMetadata{
TaskID: taskID,
Expand All @@ -631,11 +644,7 @@ func (o *objectStorage) importObjectToLocalStorage(ctx context.Context, taskID,
}

// Import task data to dfdaemon.
if err := o.peerTaskManager.GetPieceManager().Import(ctx, meta, tsd, fileHeader.Size, f); err != nil {
return err
}

return nil
return o.peerTaskManager.GetPieceManager().Import(ctx, meta, tsd, fileHeader.Size, f)
}

// importObjectToSeedPeers uses to import object to available seed peers.
Expand Down Expand Up @@ -674,22 +683,27 @@ func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName,
}

// importObjectToSeedPeer uses to import object to seed peer.
func (o *objectStorage) importObjectToSeedPeer(ctx context.Context, seedPeerHost, bucketName, objectKey, filter string, mode int, fileHeader *multipart.FileHeader) error {
func (o *objectStorage) importObjectToSeedPeer(ctx context.Context, seedPeerHost, bucketName, objectKey, filter string, mode int, fileHeader *multipart.FileHeader) (err error) {
f, err := fileHeader.Open()
if err != nil {
return err
}
defer f.Close()
defer func() {
errClose := f.Close()
if errClose != nil {
err = errors.Join(err, errClose)
}
}()

body := &bytes.Buffer{}
writer := multipart.NewWriter(body)

if err := writer.WriteField("mode", fmt.Sprint(mode)); err != nil {
if err = writer.WriteField("mode", fmt.Sprint(mode)); err != nil {
return err
}

if filter != "" {
if err := writer.WriteField("filter", filter); err != nil {
if err = writer.WriteField("filter", filter); err != nil {
return err
}
}
Expand All @@ -699,11 +713,11 @@ func (o *objectStorage) importObjectToSeedPeer(ctx context.Context, seedPeerHost
return err
}

if _, err := io.Copy(part, f); err != nil {
if _, err = io.Copy(part, f); err != nil {
return err
}

if err := writer.Close(); err != nil {
if err = writer.Close(); err != nil {
return err
}

Expand Down
9 changes: 7 additions & 2 deletions client/daemon/peer/piece_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (pm *pieceManager) processPieceFromFile(ctx context.Context, ptm storage.Pe
return n, nil
}

func (pm *pieceManager) ImportFile(ctx context.Context, ptm storage.PeerTaskMetadata, tsd storage.TaskStorageDriver, req *dfdaemonv1.ImportTaskRequest) error {
func (pm *pieceManager) ImportFile(ctx context.Context, ptm storage.PeerTaskMetadata, tsd storage.TaskStorageDriver, req *dfdaemonv1.ImportTaskRequest) (err error) {
log := logger.With("function", "ImportFile", "URL", req.Url, "taskID", ptm.TaskID)
// get file size and compute piece size and piece count
stat, err := os.Stat(req.Path)
Expand All @@ -673,7 +673,12 @@ func (pm *pieceManager) ImportFile(ctx context.Context, ptm storage.PeerTaskMeta
log.Error(msg)
return errors.New(msg)
}
defer file.Close()
defer func() {
errClose := file.Close()
if errClose != nil {
err = errors.Join(err, errClose)
}
}()

reader := file
for pieceNum := int32(0); pieceNum < maxPieceNum; pieceNum++ {
Expand Down
58 changes: 43 additions & 15 deletions client/daemon/storage/local_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -98,7 +99,7 @@ func (t *localTaskStore) SubTask(req *RegisterSubTaskRequest) *localSubTaskStore
return subtask
}

func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) (int64, error) {
func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) (n int64, err error) {
t.touch()

// piece already exists
Expand All @@ -107,7 +108,7 @@ func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest)
t.RUnlock()
t.Debugf("piece %d already exist,ignore writing piece", req.Num)
// discard already downloaded data for back source
n, err := io.CopyN(io.Discard, req.Reader, piece.Range.Length)
n, err = io.CopyN(io.Discard, req.Reader, piece.Range.Length)
if err != nil && err != io.EOF {
return n, err
}
Expand All @@ -128,12 +129,18 @@ func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest)
if err != nil {
return 0, err
}
defer file.Close()
defer func() {
errClose := file.Close()
if errClose != nil {
err = errors.Join(err, errClose)
}
}()

if _, err = file.Seek(req.Range.Start, io.SeekStart); err != nil {
return 0, err
}

n, err := io.Copy(file, io.LimitReader(req.Reader, req.Range.Length))
n, err = io.Copy(file, io.LimitReader(req.Reader, req.Range.Length))
if err != nil {
return n, err
}
Expand Down Expand Up @@ -344,7 +351,7 @@ func (t *localTaskStore) ReadAllPieces(ctx context.Context, req *ReadAllPiecesRe
}, nil
}

func (t *localTaskStore) Store(ctx context.Context, req *StoreRequest) error {
func (t *localTaskStore) Store(ctx context.Context, req *StoreRequest) (err error) {
// Store is called in callback.Done, mark local task store done, for fast search
t.Done = true
t.touch()
Expand All @@ -355,7 +362,7 @@ func (t *localTaskStore) Store(ctx context.Context, req *StoreRequest) error {
}

if !req.StoreDataOnly {
err := t.saveMetadata()
err = t.saveMetadata()
if err != nil {
t.Warnf("save task metadata error: %s", err)
return err
Expand All @@ -373,11 +380,16 @@ func (t *localTaskStore) Store(ctx context.Context, req *StoreRequest) error {
return hardlink(t.SugaredLoggerOnWith, req.Destination, t.DataFilePath)
}

_, err := os.Stat(req.Destination)
_, err = os.Stat(req.Destination)
if err == nil {
// remove exist file
t.Infof("destination file %q exists, purge it first", req.Destination)
os.Remove(req.Destination)
err = os.Remove(req.Destination)
if err != nil {
err = fmt.Errorf("purge destination file %q exists error: %s", req.Destination, err)
t.Errorf(err.Error())
return err
}
}
// 1. try to link
err = os.Link(t.DataFilePath, req.Destination)
Expand All @@ -392,7 +404,12 @@ func (t *localTaskStore) Store(ctx context.Context, req *StoreRequest) error {
t.Debugf("open tasks data error: %s", err)
return err
}
defer file.Close()
defer func() {
errClose := file.Close()
if errClose != nil {
err = errors.Join(err, errClose)
}
}()

_, err = file.Seek(0, io.SeekStart)
if err != nil {
Expand All @@ -404,7 +421,12 @@ func (t *localTaskStore) Store(ctx context.Context, req *StoreRequest) error {
t.Errorf("open tasks destination file error: %s", err)
return err
}
defer dstFile.Close()
defer func() {
errClose := dstFile.Close()
if errClose != nil {
err = errors.Join(err, errClose)
}
}()
// copy_file_range is valid in linux
// https://go-review.googlesource.com/c/go/+/229101/
n, err := io.Copy(dstFile, file)
Expand Down Expand Up @@ -567,18 +589,19 @@ func (t *localTaskStore) Reclaim() error {
t.Infof("purged task work directory: %s", t.dataDir)

taskDir := path.Dir(t.dataDir)
if dirs, err := os.ReadDir(taskDir); err != nil {
dirs, err := os.ReadDir(taskDir)
if err != nil {
t.Warnf("stat task directory %q error: %s", taskDir, err)
} else {
if len(dirs) == 0 {
if err := os.Remove(taskDir); err != nil {
if err = os.Remove(taskDir); err != nil {
t.Warnf("remove unused task directory %q error: %s", taskDir, err)
}
} else {
t.Warnf("task directory %q is not empty", taskDir)
}
}
return nil
return err
}

func (t *localTaskStore) reclaimData() error {
Expand Down Expand Up @@ -623,7 +646,7 @@ func (t *localTaskStore) reclaimMeta() error {
return nil
}

func (t *localTaskStore) saveMetadata() error {
func (t *localTaskStore) saveMetadata() (err error) {
t.Lock()
defer t.Unlock()
data, err := json.Marshal(t.persistentMetadata)
Expand All @@ -634,7 +657,12 @@ func (t *localTaskStore) saveMetadata() error {
if err != nil {
return err
}
defer metadata.Close()
defer func() {
errClose := metadata.Close()
if errClose != nil {
err = errors.Join(err, errClose)
}
}()
_, err = metadata.Write(data)
if err != nil {
t.Errorf("save metadata error: %s", err)
Expand Down
32 changes: 24 additions & 8 deletions client/daemon/storage/local_storage_subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package storage

import (
"context"
"errors"
"io"
"os"
"sync"
Expand All @@ -44,14 +45,14 @@ type localSubTaskStore struct {
Range *http.Range
}

func (t *localSubTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) (int64, error) {
func (t *localSubTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) (n int64, err error) {
// piece already exists
t.RLock()
if piece, ok := t.Pieces[req.Num]; ok {
t.RUnlock()
t.Debugf("piece %d already exist,ignore writing piece", req.Num)
// discard already downloaded data for back source
n, err := io.CopyN(io.Discard, req.Reader, piece.Range.Length)
n, err = io.CopyN(io.Discard, req.Reader, piece.Range.Length)
if err != nil && err != io.EOF {
return n, err
}
Expand All @@ -72,13 +73,18 @@ func (t *localSubTaskStore) WritePiece(ctx context.Context, req *WritePieceReque
if err != nil {
return 0, err
}
defer file.Close()
defer func() {
errClose := file.Close()
if errClose != nil {
err = errors.Join(err, errClose)
}
}()
// TODO different with localTaskStore
if _, err = file.Seek(t.Range.Start+req.Range.Start, io.SeekStart); err != nil {
return 0, err
}

n, err := io.Copy(file, io.LimitReader(req.Reader, req.Range.Length))
n, err = io.Copy(file, io.LimitReader(req.Reader, req.Range.Length))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -275,7 +281,7 @@ func (t *localSubTaskStore) UpdateTask(ctx context.Context, req *UpdateTaskReque
return nil
}

func (t *localSubTaskStore) Store(ctx context.Context, req *StoreRequest) error {
func (t *localSubTaskStore) Store(ctx context.Context, req *StoreRequest) (err error) {
// Store is called in callback.Done, mark local task store done, for fast search
t.Done = true
t.parent.touch()
Expand All @@ -296,7 +302,7 @@ func (t *localSubTaskStore) Store(ctx context.Context, req *StoreRequest) error
return hardlink(t.SugaredLoggerOnWith, req.Destination, t.parent.DataFilePath)
}

_, err := os.Stat(req.Destination)
_, err = os.Stat(req.Destination)
if err == nil {
// remove exist file
t.Infof("destination file %q exists, purge it first", req.Destination)
Expand All @@ -308,7 +314,12 @@ func (t *localSubTaskStore) Store(ctx context.Context, req *StoreRequest) error
t.Debugf("open tasks data error: %s", err)
return err
}
defer file.Close()
defer func() {
errClose := file.Close()
if errClose != nil {
err = errors.Join(err, errClose)
}
}()

_, err = file.Seek(t.Range.Start, io.SeekStart)
if err != nil {
Expand All @@ -320,7 +331,12 @@ func (t *localSubTaskStore) Store(ctx context.Context, req *StoreRequest) error
t.Errorf("open tasks destination file error: %s", err)
return err
}
defer dstFile.Close()
defer func() {
errClose := dstFile.Close()
if errClose != nil {
err = errors.Join(err, errClose)
}
}()
// copy_file_range is valid in linux
// https://go-review.googlesource.com/c/go/+/229101/
n, err := io.Copy(dstFile, io.LimitReader(file, t.ContentLength))
Expand Down
Loading

0 comments on commit 342438e

Please sign in to comment.