Skip to content

Commit

Permalink
Fix parallel upload file close bug
Browse files Browse the repository at this point in the history
  • Loading branch information
iychoi committed Jul 28, 2023
1 parent dbc16d0 commit 9c8b5b7
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 43 deletions.
11 changes: 9 additions & 2 deletions irods/fs/data_object_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func CloseDataObjectReplica(conn *connection.IRODSConnection, handle *types.IROD
return xerrors.Errorf("does not support close replica in current iRODS Version")
}

request := message.NewIRODSMessageCloseDataObjectReplicaRequest(handle.FileDescriptor, false, false, false, false)
request := message.NewIRODSMessageCloseDataObjectReplicaRequest(handle.FileDescriptor, false, false, false, false, false)
response := message.IRODSMessageCloseDataObjectReplicaResponse{}
err := conn.RequestAndCheck(request, &response, nil)
if err != nil {
Expand Down Expand Up @@ -308,7 +308,12 @@ func UploadDataObjectParallel(session *session.IRODSSession, localPath string, i
errChan <- taskErr
return
}
defer CloseDataObjectReplica(taskConn, taskHandle)
defer func() {
errClose := CloseDataObjectReplica(taskConn, taskHandle)
if errClose != nil {
errChan <- errClose
}
}()

f, taskErr := os.OpenFile(localPath, os.O_RDONLY, 0)
if taskErr != nil {
Expand Down Expand Up @@ -341,8 +346,10 @@ func UploadDataObjectParallel(session *session.IRODSSession, localPath string, i

bytesRead, taskReadErr := f.ReadAt(buffer[:bufferLen], taskOffset+(taskLength-taskRemain))
if bytesRead > 0 {
//logger.Debugf("upload '%s' offset: %d, length: %d", irodsPath, taskOffset+(taskLength-taskRemain), bytesRead)
taskWriteErr := WriteDataObjectWithTrackerCallBack(taskConn, taskHandle, buffer[:bytesRead], blockWriteCallback)
if taskWriteErr != nil {
//logger.WithError(taskWriteErr).Debugf("upload '%s' failed offset: %d, length: %d", irodsPath, taskOffset+(taskLength-taskRemain), bytesRead)
errChan <- taskWriteErr
return
}
Expand Down
55 changes: 41 additions & 14 deletions irods/message/close_data_object_replica_request.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package message

import (
"encoding/base64"
"encoding/json"
"encoding/xml"

"github.com/cyverse/go-irodsclient/irods/common"
"golang.org/x/xerrors"
Expand All @@ -11,38 +13,63 @@ import (
// Uses JSON, not XML
// Supported v4.2.9 or above
type IRODSMessageCloseDataObjectReplicaRequest struct {
FileDescriptor int `json:"fd"`
SendNotification bool `json:"send_notification"`
UpdateSize bool `json:"update_size"`
UpdateStatus bool `json:"update_status"`
ComputeChecksum bool `json:"compute_checksum"`
FileDescriptor int `json:"fd"`
SendNotification bool `json:"send_notification"`
UpdateSize bool `json:"update_size"`
UpdateStatus bool `json:"update_status"`
ComputeChecksum bool `json:"compute_checksum"`
PreserveReplicaStateTable bool `json:"preserve_replica_state_table"`
}

// NewIRODSMessageCloseDataObjectReplicaRequest creates a IRODSMessageCloseDataObjectReplicaRequest message
func NewIRODSMessageCloseDataObjectReplicaRequest(desc int, sendNotification bool, updateSize bool, updateStatus bool, computeChecksum bool) *IRODSMessageCloseDataObjectReplicaRequest {
func NewIRODSMessageCloseDataObjectReplicaRequest(desc int, sendNotification bool, updateSize bool, updateStatus bool, computeChecksum bool, preserveReplicaStateTable bool) *IRODSMessageCloseDataObjectReplicaRequest {
request := &IRODSMessageCloseDataObjectReplicaRequest{
FileDescriptor: desc,
SendNotification: sendNotification,
UpdateSize: updateSize,
UpdateStatus: updateStatus,
ComputeChecksum: computeChecksum,
FileDescriptor: desc,
SendNotification: sendNotification,
UpdateSize: updateSize,
UpdateStatus: updateStatus,
ComputeChecksum: computeChecksum,
PreserveReplicaStateTable: preserveReplicaStateTable,
}

return request
}

// GetBytes returns byte array
func (msg *IRODSMessageCloseDataObjectReplicaRequest) GetBytes() ([]byte, error) {
jsonBytes, err := json.Marshal(msg)
jsonBody, err := json.Marshal(msg)
if err != nil {
return nil, xerrors.Errorf("failed to marshal irods message to json: %w", err)
}
return jsonBytes, nil

jsonBodyBin := base64.StdEncoding.EncodeToString(jsonBody)

binBytesBuf := IRODSMessageBinBytesBuf{
Length: len(jsonBody), // use original data's length
Data: jsonBodyBin,
}

xmlBytes, err := xml.Marshal(binBytesBuf)
if err != nil {
return nil, xerrors.Errorf("failed to marshal irods message to xml: %w", err)
}
return xmlBytes, nil
}

// FromBytes returns struct from bytes
func (msg *IRODSMessageCloseDataObjectReplicaRequest) FromBytes(bytes []byte) error {
err := json.Unmarshal(bytes, msg)
binBytesBuf := IRODSMessageBinBytesBuf{}
err := xml.Unmarshal(bytes, &binBytesBuf)
if err != nil {
return xerrors.Errorf("failed to marshal irods message to xml: %w", err)
}

jsonBody, err := base64.StdEncoding.DecodeString(binBytesBuf.Data)
if err != nil {
return xerrors.Errorf("failed to decode base64 data: %w", err)
}

err = json.Unmarshal(jsonBody, msg)
if err != nil {
return xerrors.Errorf("failed to unmarshal json to irods message: %w", err)
}
Expand Down
85 changes: 58 additions & 27 deletions test/testcases/bulk_fs_api_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package testcases

import (
"fmt"
"os"
"sync"
"testing"

"github.com/cyverse/go-irodsclient/irods/fs"
Expand All @@ -26,6 +28,7 @@ func TestBulkFSAPI(t *testing.T) {

t.Run("test ParallelUploadDataObject", testParallelUploadDataObject)
t.Run("test ParallelUploadReplication", testParallelUploadReplication)
t.Run("test ParallelUploadReplicationMulti", testParallelUploadReplicationMulti)
}

func testParallelUploadDataObject(t *testing.T) {
Expand Down Expand Up @@ -108,26 +111,13 @@ func testParallelUploadDataObject(t *testing.T) {
sess.ReturnConnection(conn)
}

func testParallelUploadReplication(t *testing.T) {
account := GetTestAccount()

account.ClientServerNegotiation = false

sessionConfig := session.NewIRODSSessionConfigWithDefault("go-irodsclient-test")

sess, err := session.NewIRODSSession(account, sessionConfig)
failError(t, err)

conn, err := sess.AcquireConnection()
failError(t, err)

func parallelUploadReplication(t *testing.T, sess *session.IRODSSession, filename string) {
homedir := getHomeDir(bulkFSAPITestID)

// gen a large file, 50MB is enough
testval := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" // 62
fileSize := 50 * 1024 * 1024 // 50MB

filename := "test_repl_file.bin"
bufSize := 1024
buf := make([]byte, bufSize)

Expand Down Expand Up @@ -161,33 +151,74 @@ func testParallelUploadReplication(t *testing.T) {
err = os.Remove(filename)
failError(t, err)

sess.ReturnConnection(conn)
sess.Release()

// reconnect
sess, err = session.NewIRODSSession(account, sessionConfig)
failError(t, err)
defer sess.Release()

conn, err = sess.AcquireConnection()
newConn, err := sess.AcquireConnection()
failError(t, err)

coll, err := fs.GetCollection(conn, homedir)
coll, err := fs.GetCollection(newConn, homedir)
failError(t, err)

obj, err := fs.GetDataObject(conn, coll, filename)
obj, err := fs.GetDataObject(newConn, coll, filename)
failError(t, err)

assert.NotEmpty(t, obj.ID)
assert.Equal(t, int64(fileSize), obj.Size)
if obj.Size != int64(fileSize) {
t.Logf("error file - %s", irodsPath)
t.FailNow()
}
assert.Equal(t, 2, len(obj.Replicas))

assert.Equal(t, obj.Replicas[0].CheckSum, obj.Replicas[1].CheckSum)
assert.Equal(t, obj.Replicas[0].Status, obj.Replicas[1].Status)

// delete
err = fs.DeleteDataObject(conn, irodsPath, true)
err = fs.DeleteDataObject(newConn, irodsPath, true)
failError(t, err)

sess.ReturnConnection(conn)
sess.ReturnConnection(newConn)
}

func testParallelUploadReplicationMulti(t *testing.T) {
account := GetTestAccount()

account.ClientServerNegotiation = false

sessionConfig := session.NewIRODSSessionConfigWithDefault("go-irodsclient-test")

sess, err := session.NewIRODSSession(account, sessionConfig)
failError(t, err)

filenamePattern := "test_repl_file_%d.bin"

for repeat := 0; repeat < 10; repeat++ {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
filename := fmt.Sprintf(filenamePattern, i)

go func(sess *session.IRODSSession, filename string) {
parallelUploadReplication(t, sess, filename)
wg.Done()
}(sess, filename)
}

wg.Wait()
}

sess.Release()
}

func testParallelUploadReplication(t *testing.T) {
account := GetTestAccount()

account.ClientServerNegotiation = false

sessionConfig := session.NewIRODSSessionConfigWithDefault("go-irodsclient-test")

sess, err := session.NewIRODSSession(account, sessionConfig)
failError(t, err)

parallelUploadReplication(t, sess, "test_repl_file.bin")

sess.Release()
}

0 comments on commit 9c8b5b7

Please sign in to comment.