Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initialize object storage client in the New func #2682

Merged
merged 1 commit into from
Aug 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat: initialize object storage client in the New func
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Aug 29, 2023
commit 19df2755d2e713635e54d47ec49250de41efff5f
117 changes: 37 additions & 80 deletions client/daemon/objectstorage/objectstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,36 @@ type ObjectStorage interface {
// objectStorage provides object storage function.
type objectStorage struct {
*http.Server
config *config.DaemonOption
dynconfig config.Dynconfig
peerTaskManager peer.TaskManager
storageManager storage.Manager
peerIDGenerator peer.IDGenerator
config *config.DaemonOption
dynconfig config.Dynconfig
objectStorageClient objectstorage.ObjectStorage
peerTaskManager peer.TaskManager
storageManager storage.Manager
peerIDGenerator peer.IDGenerator
}

// New returns a new ObjectStorage instence.
func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, peerTaskManager peer.TaskManager, storageManager storage.Manager, logDir string) (ObjectStorage, error) {
// Initialize object storage client.
config, err := dynconfig.GetObjectStorage()
if err != nil {
return nil, err
}

objectStorageClient, err := objectstorage.New(config.Name, config.Region, config.Endpoint,
config.AccessKey, config.SecretKey, objectstorage.WithS3ForcePathStyle(config.S3ForcePathStyle))
if err != nil {
return nil, err
}

// Initialize object storage server.
o := &objectStorage{
config: cfg,
dynconfig: dynconfig,
peerTaskManager: peerTaskManager,
storageManager: storageManager,
peerIDGenerator: peer.NewPeerIDGenerator(cfg.Host.AdvertiseIP.String()),
config: cfg,
dynconfig: dynconfig,
objectStorageClient: objectStorageClient,
peerTaskManager: peerTaskManager,
storageManager: storageManager,
peerIDGenerator: peer.NewPeerIDGenerator(cfg.Host.AdvertiseIP.String()),
}

router := o.initRouter(cfg, logDir)
Expand Down Expand Up @@ -201,13 +216,7 @@ func (o *objectStorage) headObject(ctx *gin.Context) {
objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator))
)

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

meta, isExist, err := client.GetObjectMetadata(ctx, bucketName, objectKey)
meta, isExist, err := o.objectStorageClient.GetObjectMetadata(ctx, bucketName, objectKey)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand Down Expand Up @@ -260,13 +269,7 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
urlMeta.Filter = filter
}

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

meta, isExist, err := client.GetObjectMetadata(ctx, bucketName, objectKey)
meta, isExist, err := o.objectStorageClient.GetObjectMetadata(ctx, bucketName, objectKey)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand Down Expand Up @@ -297,7 +300,7 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
urlMeta.Digest = ""
}

signURL, err := client.GetSignURL(ctx, bucketName, objectKey, objectstorage.MethodGet, defaultSignExpireTime)
signURL, err := o.objectStorageClient.GetSignURL(ctx, bucketName, objectKey, objectstorage.MethodGet, defaultSignExpireTime)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand Down Expand Up @@ -338,19 +341,13 @@ func (o *objectStorage) destroyObject(ctx *gin.Context) {
return
}

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

var (
bucketName = params.ID
objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator))
)

logger.Infof("destroy object %s in bucket %s", objectKey, bucketName)
if err := client.DeleteObject(ctx, bucketName, objectKey); err != nil {
if err := o.objectStorageClient.DeleteObject(ctx, bucketName, objectKey); err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
Expand Down Expand Up @@ -388,13 +385,7 @@ func (o *objectStorage) putObject(ctx *gin.Context) {
fileHeader = form.File
)

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

signURL, err := client.GetSignURL(ctx, bucketName, objectKey, objectstorage.MethodGet, defaultSignExpireTime)
signURL, err := o.objectStorageClient.GetSignURL(ctx, bucketName, objectKey, objectstorage.MethodGet, defaultSignExpireTime)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand Down Expand Up @@ -454,7 +445,7 @@ func (o *objectStorage) putObject(ctx *gin.Context) {

// Import object to object storage.
log.Infof("import object %s to bucket %s", objectKey, bucketName)
if err := o.importObjectToBackend(ctx, bucketName, objectKey, dgst, fileHeader, client); err != nil {
if err := o.importObjectToBackend(ctx, bucketName, objectKey, dgst, fileHeader); err != nil {
log.Error(err)
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand All @@ -473,7 +464,7 @@ func (o *objectStorage) putObject(ctx *gin.Context) {
// Import object to object storage.
go func() {
log.Infof("import object %s to bucket %s", objectKey, bucketName)
if err := o.importObjectToBackend(context.Background(), bucketName, objectKey, dgst, fileHeader, client); err != nil {
if err := o.importObjectToBackend(context.Background(), bucketName, objectKey, dgst, fileHeader); err != nil {
log.Errorf("import object %s to bucket %s failed: %s", objectKey, bucketName, err.Error())
return
}
Expand All @@ -495,16 +486,10 @@ func (o *objectStorage) createBucket(ctx *gin.Context) {
return
}

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

bucketName := params.ID

logger.Infof("create bucket %s ", bucketName)
if err := client.CreateBucket(ctx, bucketName); err != nil {
if err := o.objectStorageClient.CreateBucket(ctx, bucketName); err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
Expand Down Expand Up @@ -534,14 +519,8 @@ func (o *objectStorage) getObjectMetadatas(ctx *gin.Context) {
limit = query.Limit
)

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

logger.Infof("get object metadatas in bucket %s", bucketName)
metadatas, err := client.GetObjectMetadatas(ctx, bucketName, prefix, marker, delimiter, limit)
metadatas, err := o.objectStorageClient.GetObjectMetadatas(ctx, bucketName, prefix, marker, delimiter, limit)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand Down Expand Up @@ -570,14 +549,8 @@ func (o *objectStorage) copyObject(ctx *gin.Context) {
source = form.SourceObjectKey
)

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

logger.Infof("copy object from %s to %s", source, destination)
if err := client.CopyObject(ctx, bucketName, source, destination); err != nil {
if err := o.objectStorageClient.CopyObject(ctx, bucketName, source, destination); err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
Expand All @@ -601,7 +574,7 @@ func (o *objectStorage) md5FromFileHeader(fileHeader *multipart.FileHeader) (dig
}

// importObjectToBackend uses to import object to backend.
func (o *objectStorage) importObjectToBackend(ctx context.Context, bucketName, objectKey string, dgst *digest.Digest, fileHeader *multipart.FileHeader, client objectstorage.ObjectStorage) (err error) {
func (o *objectStorage) importObjectToBackend(ctx context.Context, bucketName, objectKey string, dgst *digest.Digest, fileHeader *multipart.FileHeader) (err error) {
f, err := fileHeader.Open()
if err != nil {
return err
Expand All @@ -612,7 +585,7 @@ func (o *objectStorage) importObjectToBackend(ctx context.Context, bucketName, o
}
}()

return client.PutObject(ctx, bucketName, objectKey, dgst.String(), f)
return o.objectStorageClient.PutObject(ctx, bucketName, objectKey, dgst.String(), f)
}

// importObjectToSeedPeers uses to import object to local storage.
Expand Down Expand Up @@ -742,19 +715,3 @@ func (o *objectStorage) importObjectToSeedPeer(ctx context.Context, seedPeerHost

return nil
}

// client uses to generate client of object storage.
func (o *objectStorage) client() (objectstorage.ObjectStorage, error) {
config, err := o.dynconfig.GetObjectStorage()
if err != nil {
return nil, err
}

client, err := objectstorage.New(config.Name, config.Region, config.Endpoint,
config.AccessKey, config.SecretKey, objectstorage.WithS3ForcePathStyle(config.S3ForcePathStyle))
if err != nil {
return nil, err
}

return client, nil
}