Skip to content

Commit

Permalink
refactor: create model grpc api in manager (#2528)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jul 10, 2023
1 parent 6301d22 commit a8f7c56
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 201 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.20

require (
d7y.io/api v1.9.4
d7y.io/api v1.9.5
github.com/MysteriousPotato/go-lockable v0.2.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down Expand Up @@ -111,7 +111,7 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
Expand Down Expand Up @@ -216,7 +216,7 @@ require (
golang.org/x/net v0.11.0 // indirect
golang.org/x/term v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/tools v0.9.3 // indirect
golang.org/x/tools v0.10.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
Expand Down
14 changes: 7 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api v1.9.4 h1:oU4s9YLhsYmYStVHe/qJykpFPmia1S+kJX97pgqFRGs=
d7y.io/api v1.9.4/go.mod h1:6bn5Z+OyjyvlB1UMxUZsFbyx47qjkpNEvC25hq5Qxy0=
d7y.io/api v1.9.5 h1:meQlDj3L0KiCCK+NIDvKS/kVnDW1je8XZrH1KPBlacU=
d7y.io/api v1.9.5/go.mod h1:4uEVuq54So/EADecdIgIfx8CTmdASUpqrWM48oT/MH0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down Expand Up @@ -240,8 +240,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.0.1 h1:kt9FtLiooDc0vbwTLhdg3dyNX1K9Qwa1EK9LcD4jVUQ=
github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87KZaeN4x9zpL9Qt8fQC7d+vs=
github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA=
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
Expand Down Expand Up @@ -1280,7 +1280,7 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.1-0.20210830214625-1b1db11ec8f4/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -1563,8 +1563,8 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.9.3 h1:Gn1I8+64MsuTb/HpH+LmQtNas23LhUVr3rYZ0eKuaMM=
golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg=
golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
172 changes: 84 additions & 88 deletions manager/rpcserver/manager_server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -51,6 +50,7 @@ import (
"d7y.io/dragonfly/v2/pkg/objectstorage"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
"d7y.io/dragonfly/v2/pkg/slices"
"d7y.io/dragonfly/v2/pkg/structure"
)

// managerServerV1 is v1 version of the manager grpc server.
Expand Down Expand Up @@ -627,11 +627,13 @@ func (s *managerServerV1) GetObjectStorage(ctx context.Context, req *managerv1.G

// List buckets configuration.
func (s *managerServerV1) ListBuckets(ctx context.Context, req *managerv1.ListBucketsRequest) (*managerv1.ListBucketsResponse, error) {
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)

if !s.objectStorageConfig.Enable {
log.Warn("object storage is disabled")
return nil, status.Error(codes.Internal, "object storage is disabled")
}

log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
var pbListBucketsResponse managerv1.ListBucketsResponse
cacheKey := pkgredis.MakeBucketKeyInManager(s.objectStorageConfig.Name)

Expand Down Expand Up @@ -745,80 +747,73 @@ func (s *managerServerV1) ListApplications(ctx context.Context, req *managerv1.L

// CreateModel creates model and update data of model to object storage.
func (s *managerServerV1) CreateModel(ctx context.Context, req *managerv1.CreateModelRequest) (*emptypb.Empty, error) {
log := logger.WithHostnameAndIP(req.GetHostname(), req.GetIp())

if !s.objectStorageConfig.Enable {
log.Warn("object storage is disabled")
return nil, status.Error(codes.Internal, "object storage is disabled")
}

IsExist, err := s.objectStorage.IsBucketExist(ctx, s.config.Trainer.BucketName)
if err != nil {
return nil, status.Error(codes.Internal, "find bucket exist failed")
}

if !IsExist {
if err := s.objectStorage.CreateBucket(ctx, s.config.Trainer.BucketName); err != nil {
return nil, status.Error(codes.Internal, "create model bucket failed")
}
// Create model bucket, if not exist.
if err := s.createModelBucket(ctx); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}

log := logger.WithHostnameAndIP(req.GetHostname(), req.GetIp())

var (
modelName string
modelType string
modelEvaluation map[string]any
modelVersion = time.Now().Format(types.ModelVersionTimeFormat)
name string
typ string
evaluation types.ModelEvaluation
version = time.Now().Nanosecond()
)

switch modelUploadRequest := req.GetRequest().(type) {
switch createModelRequest := req.GetRequest().(type) {
case *managerv1.CreateModelRequest_CreateGnnRequest:
modelName = types.MakeGNNModelName(req.GetClusterId())
modelType = models.ModelTypeGNN
modelEvaluation = map[string]any{
"Precision": modelUploadRequest.CreateGnnRequest.GetPrecision(),
"Recall": modelUploadRequest.CreateGnnRequest.GetRecall(),
"F1Score": modelUploadRequest.CreateGnnRequest.GetF1Score(),
}

IsExist, err := s.objectStorage.IsObjectExist(ctx, s.config.Trainer.BucketName, types.MakeObjectKeyOfModelConfigFile(modelName))
if err != nil {
log.Errorf("find GNN model config failed: %s", err.Error())
name = types.MakeGNNModelName(req.GetIp(), req.GetHostname(), req.GetClusterId())
typ = models.ModelTypeGNN
evaluation = types.ModelEvaluation{
Precision: createModelRequest.CreateGnnRequest.GetPrecision(),
Recall: createModelRequest.CreateGnnRequest.GetRecall(),
F1Score: createModelRequest.CreateGnnRequest.GetF1Score(),
}

if !IsExist {
if err = s.createGNNModelConfig(ctx, modelName, modelVersion); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// Update GNN model config to object storage.
if err := s.createModelConfig(ctx, name); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}

// Upload GNN model file to object storage.
data := createModelRequest.CreateGnnRequest.GetData()
dgst := digest.New(digest.AlgorithmSHA256, digest.SHA256FromBytes(data))
if err := s.objectStorage.PutObject(ctx, s.config.Trainer.BucketName,
types.MakeObjectKeyOfModelFile(modelName, modelVersion), digest.AlgorithmMD5, bytes.NewReader(req.GetCreateGnnRequest().GetData())); err != nil {
types.MakeObjectKeyOfModelFile(name, version), dgst.String(), bytes.NewReader(data)); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}
case *managerv1.CreateModelRequest_CreateMlpRequest:
modelName = types.MakeMLPModelName(req.GetHostname(), req.GetIp(), req.GetClusterId())
modelType = models.ModelTypeMLP
modelEvaluation = map[string]any{
"Mse": modelUploadRequest.CreateMlpRequest.GetMse(),
"Mae": modelUploadRequest.CreateMlpRequest.GetMae(),
}

IsExist, err := s.objectStorage.IsObjectExist(ctx, s.config.Trainer.BucketName, types.MakeObjectKeyOfModelConfigFile(modelName))
if err != nil {
log.Errorf("find MLP model config failed: %s", err.Error())
name = types.MakeMLPModelName(req.GetHostname(), req.GetIp(), req.GetClusterId())
typ = models.ModelTypeMLP
evaluation = types.ModelEvaluation{
MSE: createModelRequest.CreateMlpRequest.GetMse(),
MAE: createModelRequest.CreateMlpRequest.GetMae(),
}

if !IsExist {
if err = s.createMLPModelConfig(ctx, modelName, modelVersion); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// Update MLP model config to object storage.
if err := s.createModelConfig(ctx, name); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}

// Upload MLP model file to object storage.
data := createModelRequest.CreateMlpRequest.GetData()
dgst := digest.New(digest.AlgorithmSHA256, digest.SHA256FromBytes(data))
if err := s.objectStorage.PutObject(ctx, s.config.Trainer.BucketName,
types.MakeObjectKeyOfModelFile(modelName, modelVersion), digest.AlgorithmMD5, bytes.NewReader(req.GetCreateMlpRequest().GetData())); err != nil {
types.MakeObjectKeyOfModelFile(name, version), dgst.String(), bytes.NewReader(data)); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}
default:
msg := fmt.Sprintf("receive unknow request: %#v", modelUploadRequest)
msg := fmt.Sprintf("receive unknow request: %#v", createModelRequest)
log.Error(msg)
return nil, status.Error(codes.FailedPrecondition, msg)
}
Expand All @@ -829,73 +824,74 @@ func (s *managerServerV1) CreateModel(ctx context.Context, req *managerv1.Create
IP: req.Ip,
SchedulerClusterID: uint(req.ClusterId),
}).Error; err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}

if err := s.db.WithContext(ctx).Create(models.Model{
Type: modelType,
Version: modelVersion,
Evaluation: modelEvaluation,
SchedulerID: scheduler.ID,
}).Error; err != nil {
rawEvaluation, err := structure.StructToMap(evaluation)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// Create model in database.
if err := s.db.WithContext(ctx).Model(&scheduler).Association("Models").Append(&models.Model{
Type: typ,
Version: fmt.Sprint(version),
State: models.ModelVersionStateInactive,
Evaluation: rawEvaluation,
}); err != nil {
log.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}

return new(emptypb.Empty), nil
}

// createGNNModelConfig creates GNN model config and update GNN config to object storage.
func (s *managerServerV1) createGNNModelConfig(ctx context.Context, modelName string, modelVersion string) error {
var versions []int64
version, err := strconv.ParseInt(modelVersion, 10, 64)
// createModelBucket creates model bucket if not exist.
func (s *managerServerV1) createModelBucket(ctx context.Context) error {
// Check bucket exist.
isExist, err := s.objectStorage.IsBucketExist(ctx, s.config.Trainer.BucketName)
if err != nil {
return err
}
versions = append(versions, version)

pbModelConfig := inferencev1.ModelConfig{
Name: modelName,
Platform: types.DefaultPlatform,
VersionPolicy: &inferencev1.ModelVersionPolicy{
PolicyChoice: &inferencev1.ModelVersionPolicy_Specific_{
Specific: &inferencev1.ModelVersionPolicy_Specific{
Versions: versions,
},
},
},
}

if err := s.objectStorage.PutObject(ctx, s.config.Trainer.BucketName,
types.MakeObjectKeyOfModelConfigFile(modelName), digest.AlgorithmMD5, strings.NewReader(pbModelConfig.String())); err != nil {
return err
// Create bucket if not exist.
if !isExist {
if err := s.objectStorage.CreateBucket(ctx, s.config.Trainer.BucketName); err != nil {
return err
}
}

return nil
}

// createMLPModelConfig creates MLP model config and update MLP config to object storage.
func (s *managerServerV1) createMLPModelConfig(ctx context.Context, modelName string, modelVersion string) error {
var versions []int64
version, err := strconv.ParseInt(modelVersion, 10, 64)
// createModelConfig creates model config to object storage.
func (s *managerServerV1) createModelConfig(ctx context.Context, name string) error {
objectKey := types.MakeObjectKeyOfModelConfigFile(name)
isExist, err := s.objectStorage.IsObjectExist(ctx, s.config.Trainer.BucketName, objectKey)
if err != nil {
return err
}
versions = append(versions, version)

// If the model config already exists, skip it.
if isExist {
return nil
}

// If the model config does not exist, create a new model config.
pbModelConfig := inferencev1.ModelConfig{
Name: modelName,
Platform: types.DefaultPlatform,
Name: name,
Platform: types.DefaultTritonPlatform,
VersionPolicy: &inferencev1.ModelVersionPolicy{
PolicyChoice: &inferencev1.ModelVersionPolicy_Specific_{
Specific: &inferencev1.ModelVersionPolicy_Specific{
Versions: versions,
},
Specific: &inferencev1.ModelVersionPolicy_Specific{},
},
},
}

dgst := digest.New(digest.AlgorithmSHA256, digest.SHA256FromStrings(pbModelConfig.String()))
if err := s.objectStorage.PutObject(ctx, s.config.Trainer.BucketName,
types.MakeObjectKeyOfModelConfigFile(modelName), digest.AlgorithmMD5, strings.NewReader(pbModelConfig.String())); err != nil {
types.MakeObjectKeyOfModelConfigFile(name), dgst.String(), strings.NewReader(pbModelConfig.String())); err != nil {
return err
}

Expand Down
Loading

0 comments on commit a8f7c56

Please sign in to comment.