Skip to content

Commit

Permalink
set thread number for redirect-to-resource requests
Browse files Browse the repository at this point in the history
  • Loading branch information
iychoi committed Jun 6, 2024
1 parent cc56ec6 commit 0b9cc3b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 19 deletions.
8 changes: 4 additions & 4 deletions fs/fs_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (fs *FileSystem) DownloadFileParallelResumable(irodsPath string, resource s
}

// DownloadFileRedirectToResource downloads a file from resource to local in parallel
func (fs *FileSystem) DownloadFileRedirectToResource(irodsPath string, resource string, localPath string, callback common.TrackerCallBack) error {
func (fs *FileSystem) DownloadFileRedirectToResource(irodsPath string, resource string, localPath string, taskNum int, callback common.TrackerCallBack) error {
irodsSrcPath := util.GetCorrectIRODSPath(irodsPath)
localDestPath := util.GetCorrectLocalPath(localPath)

Expand Down Expand Up @@ -195,7 +195,7 @@ func (fs *FileSystem) DownloadFileRedirectToResource(irodsPath string, resource
}
}

return irods_fs.DownloadDataObjectFromResourceServer(fs.ioSession, irodsSrcPath, resource, localFilePath, srcStat.Size, callback)
return irods_fs.DownloadDataObjectFromResourceServer(fs.ioSession, irodsSrcPath, resource, localFilePath, srcStat.Size, taskNum, callback)
}

// UploadFile uploads a local file to irods
Expand Down Expand Up @@ -325,7 +325,7 @@ func (fs *FileSystem) UploadFileParallel(localPath string, irodsPath string, res
}

// UploadFileParallelRedirectToResource uploads a file from local to resource server in parallel
func (fs *FileSystem) UploadFileParallelRedirectToResource(localPath string, irodsPath string, resource string, replicate bool, callback common.TrackerCallBack) error {
func (fs *FileSystem) UploadFileParallelRedirectToResource(localPath string, irodsPath string, resource string, taskNum int, replicate bool, callback common.TrackerCallBack) error {
localSrcPath := util.GetCorrectLocalPath(localPath)
irodsDestPath := util.GetCorrectIRODSPath(irodsPath)

Expand Down Expand Up @@ -361,7 +361,7 @@ func (fs *FileSystem) UploadFileParallelRedirectToResource(localPath string, iro
}
}

err = irods_fs.UploadDataObjectToResourceServer(fs.ioSession, localSrcPath, irodsFilePath, resource, replicate, callback)
err = irods_fs.UploadDataObjectToResourceServer(fs.ioSession, localSrcPath, irodsFilePath, resource, taskNum, replicate, callback)
if err != nil {
return err
}
Expand Down
36 changes: 28 additions & 8 deletions irods/fs/data_object_resource_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

// GetDataObjectRedirectionInfoForGet returns a redirection info for accessing the data object for downloading
func GetDataObjectRedirectionInfoForGet(conn *connection.IRODSConnection, path string, resource string, fileLength int64) (*types.IRODSFileOpenRedirectionHandle, error) {
func GetDataObjectRedirectionInfoForGet(conn *connection.IRODSConnection, path string, resource string, fileLength int64, taskNum int) (*types.IRODSFileOpenRedirectionHandle, error) {
if conn == nil || !conn.IsConnected() {
return nil, xerrors.Errorf("connection is nil or disconnected")
}
Expand All @@ -37,7 +37,12 @@ func GetDataObjectRedirectionInfoForGet(conn *connection.IRODSConnection, path s
resource = account.DefaultResource
}

request := message.NewIRODSMessageGetDataObjectRequest(path, resource, fileLength)
numTasks := taskNum
if numTasks <= 0 {
numTasks = util.GetNumTasksForParallelTransfer(fileLength)
}

request := message.NewIRODSMessageGetDataObjectRequest(path, resource, fileLength, numTasks)
response := message.IRODSMessageGetDataObjectResponse{}
err := conn.RequestAndCheck(request, &response, nil)
if err != nil {
Expand Down Expand Up @@ -76,7 +81,7 @@ func GetDataObjectRedirectionInfoForGet(conn *connection.IRODSConnection, path s
}

// GetDataObjectRedirectionInfoForPut returns a redirection info for accessing the data object for uploading
func GetDataObjectRedirectionInfoForPut(conn *connection.IRODSConnection, path string, resource string, fileLength int64) (*types.IRODSFileOpenRedirectionHandle, error) {
func GetDataObjectRedirectionInfoForPut(conn *connection.IRODSConnection, path string, resource string, fileLength int64, taskNum int) (*types.IRODSFileOpenRedirectionHandle, error) {
if conn == nil || !conn.IsConnected() {
return nil, xerrors.Errorf("connection is nil or disconnected")
}
Expand All @@ -96,7 +101,12 @@ func GetDataObjectRedirectionInfoForPut(conn *connection.IRODSConnection, path s
resource = account.DefaultResource
}

request := message.NewIRODSMessagePutDataObjectRequest(path, resource, fileLength)
numTasks := taskNum
if numTasks <= 0 {
numTasks = util.GetNumTasksForParallelTransfer(fileLength)
}

request := message.NewIRODSMessagePutDataObjectRequest(path, resource, fileLength, numTasks)
response := message.IRODSMessagePutDataObjectResponse{}
err := conn.RequestAndCheck(request, &response, nil)
if err != nil {
Expand Down Expand Up @@ -535,7 +545,7 @@ func uploadDataObjectChunkToResourceServer(sess *session.IRODSSession, controlCo
}

// DownloadDataObjectFromResourceServer downloads a data object at the iRODS path to the local path
func DownloadDataObjectFromResourceServer(session *session.IRODSSession, irodsPath string, resource string, localPath string, fileLength int64, callback common.TrackerCallBack) error {
func DownloadDataObjectFromResourceServer(session *session.IRODSSession, irodsPath string, resource string, localPath string, fileLength int64, taskNum int, callback common.TrackerCallBack) error {
logger := log.WithFields(log.Fields{
"package": "fs",
"function": "DownloadDataObjectFromResourceServer",
Expand All @@ -549,6 +559,11 @@ func DownloadDataObjectFromResourceServer(session *session.IRODSSession, irodsPa
resource = account.DefaultResource
}

numTasks := taskNum
if numTasks <= 0 {
numTasks = util.GetNumTasksForParallelTransfer(fileLength)
}

conn, err := session.AcquireConnection()
if err != nil {
return xerrors.Errorf("failed to get connection: %w", err)
Expand All @@ -559,7 +574,7 @@ func DownloadDataObjectFromResourceServer(session *session.IRODSSession, irodsPa
return xerrors.Errorf("connection is nil or disconnected")
}

handle, err := GetDataObjectRedirectionInfoForGet(conn, irodsPath, resource, fileLength)
handle, err := GetDataObjectRedirectionInfoForGet(conn, irodsPath, resource, fileLength, numTasks)
if err != nil {
logger.Debugf("failed to get redirection info for data object %s, switch to DownloadDataObjectParallel: %s", irodsPath, err.Error())

Expand Down Expand Up @@ -645,7 +660,7 @@ func DownloadDataObjectFromResourceServer(session *session.IRODSSession, irodsPa
}

// UploadDataObjectToResourceServer uploads a data object at the local path to the iRODS path
func UploadDataObjectToResourceServer(session *session.IRODSSession, localPath string, irodsPath string, resource string, replicate bool, callback common.TrackerCallBack) error {
func UploadDataObjectToResourceServer(session *session.IRODSSession, localPath string, irodsPath string, resource string, taskNum int, replicate bool, callback common.TrackerCallBack) error {
logger := log.WithFields(log.Fields{
"package": "fs",
"function": "UploadDataObjectToResourceServer",
Expand All @@ -666,6 +681,11 @@ func UploadDataObjectToResourceServer(session *session.IRODSSession, localPath s

fileLength := stat.Size()

numTasks := taskNum
if numTasks <= 0 {
numTasks = util.GetNumTasksForParallelTransfer(fileLength)
}

conn, err := session.AcquireConnection()
if err != nil {
return xerrors.Errorf("failed to get connection: %w", err)
Expand All @@ -676,7 +696,7 @@ func UploadDataObjectToResourceServer(session *session.IRODSSession, localPath s
return xerrors.Errorf("connection is nil or disconnected")
}

handle, err := GetDataObjectRedirectionInfoForPut(conn, irodsPath, resource, fileLength)
handle, err := GetDataObjectRedirectionInfoForPut(conn, irodsPath, resource, fileLength, numTasks)
if err != nil {
logger.Debugf("failed to get redirection info for data object %s, switch to UploadDataObjctParallel: %s", irodsPath, err.Error())

Expand Down
4 changes: 2 additions & 2 deletions irods/message/get_data_object_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
type IRODSMessageGetDataObjectRequest IRODSMessageDataObjectRequest

// NewIRODSMessageGetDataObjectRequest creates a IRODSMessageGetDataObjectRequest message
func NewIRODSMessageGetDataObjectRequest(path string, resource string, fileLength int64) *IRODSMessageGetDataObjectRequest {
func NewIRODSMessageGetDataObjectRequest(path string, resource string, fileLength int64, threads int) *IRODSMessageGetDataObjectRequest {
request := &IRODSMessageGetDataObjectRequest{
Path: path,
CreateMode: 0,
OpenFlags: 0,
Offset: 0,
Size: fileLength,
Threads: 0,
Threads: threads,
OperationType: int(common.OPER_TYPE_GET_DATA_OBJ),
KeyVals: IRODSMessageSSKeyVal{
Length: 0,
Expand Down
4 changes: 2 additions & 2 deletions irods/message/put_data_object_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
type IRODSMessagePutDataObjectRequest IRODSMessageDataObjectRequest

// NewIRODSMessagePutDataObjectRequest creates a IRODSMessagePutDataObjectRequest message
func NewIRODSMessagePutDataObjectRequest(path string, resource string, fileLength int64) *IRODSMessagePutDataObjectRequest {
func NewIRODSMessagePutDataObjectRequest(path string, resource string, fileLength int64, threads int) *IRODSMessagePutDataObjectRequest {
request := &IRODSMessagePutDataObjectRequest{
Path: path,
CreateMode: 0,
OpenFlags: 0,
Offset: 0,
Size: fileLength,
Threads: 0,
Threads: threads,
OperationType: int(common.OPER_TYPE_PUT_DATA_OBJ),
KeyVals: IRODSMessageSSKeyVal{
Length: 0,
Expand Down
6 changes: 3 additions & 3 deletions test/testcases/redirection_to_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func testDownloadDataObjectFromResourceServer(t *testing.T) {
assert.Equal(t, int64(fileSize), obj.Size)

// get
err = fs.DownloadDataObjectFromResourceServer(sess, irodsPath, "", filename, int64(fileSize), callBack)
err = fs.DownloadDataObjectFromResourceServer(sess, irodsPath, "", filename, int64(fileSize), 0, callBack)
failError(t, err)

checksumNew, err := util.HashLocalFile(filename, string(types.ChecksumAlgorithmSHA1))
Expand Down Expand Up @@ -131,7 +131,7 @@ func testUploadDataObjectFromResourceServer(t *testing.T) {
callbackCalled++
}

err = fs.UploadDataObjectToResourceServer(sess, filepath, irodsPath, "", false, callBack)
err = fs.UploadDataObjectToResourceServer(sess, filepath, irodsPath, "", 0, false, callBack)
failError(t, err)
assert.Greater(t, callbackCalled, 10) // at least called 10 times

Expand All @@ -151,7 +151,7 @@ func testUploadDataObjectFromResourceServer(t *testing.T) {
assert.Equal(t, int64(fileSize), obj.Size)

// get
err = fs.DownloadDataObjectFromResourceServer(sess, irodsPath, "", filename, int64(fileSize), callBack)
err = fs.DownloadDataObjectFromResourceServer(sess, irodsPath, "", filename, int64(fileSize), 0, callBack)
failError(t, err)

checksumNew, err := util.HashLocalFile(filename, string(types.ChecksumAlgorithmSHA1))
Expand Down

0 comments on commit 0b9cc3b

Please sign in to comment.