Skip to content

Commit

Permalink
feat: initialize object storage client in the New func
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Aug 29, 2023
1 parent c0522c1 commit 19df275
Showing 1 changed file with 37 additions and 80 deletions.
117 changes: 37 additions & 80 deletions client/daemon/objectstorage/objectstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,36 @@ type ObjectStorage interface {
// objectStorage provides object storage function.
type objectStorage struct {
*http.Server
config *config.DaemonOption
dynconfig config.Dynconfig
peerTaskManager peer.TaskManager
storageManager storage.Manager
peerIDGenerator peer.IDGenerator
config *config.DaemonOption
dynconfig config.Dynconfig
objectStorageClient objectstorage.ObjectStorage
peerTaskManager peer.TaskManager
storageManager storage.Manager
peerIDGenerator peer.IDGenerator
}

// New returns a new ObjectStorage instence.
func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, peerTaskManager peer.TaskManager, storageManager storage.Manager, logDir string) (ObjectStorage, error) {
// Initialize object storage client.
config, err := dynconfig.GetObjectStorage()
if err != nil {
return nil, err
}

objectStorageClient, err := objectstorage.New(config.Name, config.Region, config.Endpoint,
config.AccessKey, config.SecretKey, objectstorage.WithS3ForcePathStyle(config.S3ForcePathStyle))
if err != nil {
return nil, err
}

// Initialize object storage server.
o := &objectStorage{
config: cfg,
dynconfig: dynconfig,
peerTaskManager: peerTaskManager,
storageManager: storageManager,
peerIDGenerator: peer.NewPeerIDGenerator(cfg.Host.AdvertiseIP.String()),
config: cfg,
dynconfig: dynconfig,
objectStorageClient: objectStorageClient,
peerTaskManager: peerTaskManager,
storageManager: storageManager,
peerIDGenerator: peer.NewPeerIDGenerator(cfg.Host.AdvertiseIP.String()),
}

router := o.initRouter(cfg, logDir)
Expand Down Expand Up @@ -201,13 +216,7 @@ func (o *objectStorage) headObject(ctx *gin.Context) {
objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator))
)

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

meta, isExist, err := client.GetObjectMetadata(ctx, bucketName, objectKey)
meta, isExist, err := o.objectStorageClient.GetObjectMetadata(ctx, bucketName, objectKey)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand Down Expand Up @@ -260,13 +269,7 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
urlMeta.Filter = filter
}

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

meta, isExist, err := client.GetObjectMetadata(ctx, bucketName, objectKey)
meta, isExist, err := o.objectStorageClient.GetObjectMetadata(ctx, bucketName, objectKey)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand Down Expand Up @@ -297,7 +300,7 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
urlMeta.Digest = ""
}

signURL, err := client.GetSignURL(ctx, bucketName, objectKey, objectstorage.MethodGet, defaultSignExpireTime)
signURL, err := o.objectStorageClient.GetSignURL(ctx, bucketName, objectKey, objectstorage.MethodGet, defaultSignExpireTime)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand Down Expand Up @@ -338,19 +341,13 @@ func (o *objectStorage) destroyObject(ctx *gin.Context) {
return
}

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

var (
bucketName = params.ID
objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator))
)

logger.Infof("destroy object %s in bucket %s", objectKey, bucketName)
if err := client.DeleteObject(ctx, bucketName, objectKey); err != nil {
if err := o.objectStorageClient.DeleteObject(ctx, bucketName, objectKey); err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
Expand Down Expand Up @@ -388,13 +385,7 @@ func (o *objectStorage) putObject(ctx *gin.Context) {
fileHeader = form.File
)

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

signURL, err := client.GetSignURL(ctx, bucketName, objectKey, objectstorage.MethodGet, defaultSignExpireTime)
signURL, err := o.objectStorageClient.GetSignURL(ctx, bucketName, objectKey, objectstorage.MethodGet, defaultSignExpireTime)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand Down Expand Up @@ -454,7 +445,7 @@ func (o *objectStorage) putObject(ctx *gin.Context) {

// Import object to object storage.
log.Infof("import object %s to bucket %s", objectKey, bucketName)
if err := o.importObjectToBackend(ctx, bucketName, objectKey, dgst, fileHeader, client); err != nil {
if err := o.importObjectToBackend(ctx, bucketName, objectKey, dgst, fileHeader); err != nil {
log.Error(err)
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand All @@ -473,7 +464,7 @@ func (o *objectStorage) putObject(ctx *gin.Context) {
// Import object to object storage.
go func() {
log.Infof("import object %s to bucket %s", objectKey, bucketName)
if err := o.importObjectToBackend(context.Background(), bucketName, objectKey, dgst, fileHeader, client); err != nil {
if err := o.importObjectToBackend(context.Background(), bucketName, objectKey, dgst, fileHeader); err != nil {
log.Errorf("import object %s to bucket %s failed: %s", objectKey, bucketName, err.Error())
return
}
Expand All @@ -495,16 +486,10 @@ func (o *objectStorage) createBucket(ctx *gin.Context) {
return
}

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

bucketName := params.ID

logger.Infof("create bucket %s ", bucketName)
if err := client.CreateBucket(ctx, bucketName); err != nil {
if err := o.objectStorageClient.CreateBucket(ctx, bucketName); err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
Expand Down Expand Up @@ -534,14 +519,8 @@ func (o *objectStorage) getObjectMetadatas(ctx *gin.Context) {
limit = query.Limit
)

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

logger.Infof("get object metadatas in bucket %s", bucketName)
metadatas, err := client.GetObjectMetadatas(ctx, bucketName, prefix, marker, delimiter, limit)
metadatas, err := o.objectStorageClient.GetObjectMetadatas(ctx, bucketName, prefix, marker, delimiter, limit)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
Expand Down Expand Up @@ -570,14 +549,8 @@ func (o *objectStorage) copyObject(ctx *gin.Context) {
source = form.SourceObjectKey
)

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

logger.Infof("copy object from %s to %s", source, destination)
if err := client.CopyObject(ctx, bucketName, source, destination); err != nil {
if err := o.objectStorageClient.CopyObject(ctx, bucketName, source, destination); err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
Expand All @@ -601,7 +574,7 @@ func (o *objectStorage) md5FromFileHeader(fileHeader *multipart.FileHeader) (dig
}

// importObjectToBackend uses to import object to backend.
func (o *objectStorage) importObjectToBackend(ctx context.Context, bucketName, objectKey string, dgst *digest.Digest, fileHeader *multipart.FileHeader, client objectstorage.ObjectStorage) (err error) {
func (o *objectStorage) importObjectToBackend(ctx context.Context, bucketName, objectKey string, dgst *digest.Digest, fileHeader *multipart.FileHeader) (err error) {
f, err := fileHeader.Open()
if err != nil {
return err
Expand All @@ -612,7 +585,7 @@ func (o *objectStorage) importObjectToBackend(ctx context.Context, bucketName, o
}
}()

return client.PutObject(ctx, bucketName, objectKey, dgst.String(), f)
return o.objectStorageClient.PutObject(ctx, bucketName, objectKey, dgst.String(), f)
}

// importObjectToSeedPeers uses to import object to local storage.
Expand Down Expand Up @@ -742,19 +715,3 @@ func (o *objectStorage) importObjectToSeedPeer(ctx context.Context, seedPeerHost

return nil
}

// client uses to generate client of object storage.
func (o *objectStorage) client() (objectstorage.ObjectStorage, error) {
config, err := o.dynconfig.GetObjectStorage()
if err != nil {
return nil, err
}

client, err := objectstorage.New(config.Name, config.Region, config.Endpoint,
config.AccessKey, config.SecretKey, objectstorage.WithS3ForcePathStyle(config.S3ForcePathStyle))
if err != nil {
return nil, err
}

return client, nil
}

0 comments on commit 19df275

Please sign in to comment.