Skip to content

Commit

Permalink
Feat: Add PVC storage support (#267)
Browse files Browse the repository at this point in the history
* Add PVC support

Add PVC support according to the design and discussions
captured in the issue, #230

Signed-off-by: Chin Huang <[email protected]>

* add predictor controller login

Signed-off-by: Chin Huang <[email protected]>

* code restructure, cleanup based on review

Signed-off-by: Chin Huang <[email protected]>

* fix addPullerSidecar to include all pvcs

Signed-off-by: Chin Huang <[email protected]>

* restructure and simplify code, use global configmap rather than env var

Signed-off-by: Chin Huang <[email protected]>

* make AllowAnyPVC dynamic, update docs

Signed-off-by: Chin Huang <[email protected]>

* add runtimeSupportsPredictor check

Signed-off-by: Chin Huang <[email protected]>

* use PredictorRegistry and add storage to find()

Signed-off-by: Chin Huang <[email protected]>

---------

Signed-off-by: Chin Huang <[email protected]>
  • Loading branch information
chinhuang007 authored Mar 1, 2023
1 parent bf412be commit f794ffc
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 25 deletions.
6 changes: 6 additions & 0 deletions controllers/modelmesh/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,18 @@ const (
//The env variable puller uses to configure the config dir (secrets)
PullerEnvStorageConfigDir = "STORAGE_CONFIG_DIR"

//The env variable puller uses to configure the pvc dir (secrets)
PullerEnvPVCDir = "PVC_MOUNTS_DIR"

//The puller default port number
PullerPortNumber = 8086

//The puller model mount path
PullerModelPath = "/models"

//The puller model PVC path
DefaultPVCMountsDir = "/pvc_mounts"

//The puller model config path
PullerConfigPath = "/storage-config"

Expand Down
3 changes: 2 additions & 1 deletion controllers/modelmesh/modelmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Deployment struct {
RESTProxyImage string
RESTProxyResources *corev1.ResourceRequirements
RESTProxyPort uint16
PVCs []string
// internal fields used when templating
ModelMeshLimitCPU string
ModelMeshRequestsCPU string
Expand Down Expand Up @@ -141,7 +142,7 @@ func (m *Deployment) Apply(ctx context.Context) error {

if useStorageHelper(m.SRSpec) {
manifest, err = manifest.Transform(
addPullerTransform(m.SRSpec, m.PullerImage, m.PullerImageCommand, m.PullerResources),
addPullerTransform(m.SRSpec, m.PullerImage, m.PullerImageCommand, m.PullerResources, m.PVCs),
)
if err != nil {
return fmt.Errorf("Error transforming: %w", err)
Expand Down
17 changes: 14 additions & 3 deletions controllers/modelmesh/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package modelmesh

import (
"path/filepath"
"strconv"

kserveapi "github.com/kserve/kserve/pkg/apis/serving/v1alpha1"
Expand All @@ -25,14 +26,14 @@ import (

var StorageSecretName string

func addPullerTransform(rts *kserveapi.ServingRuntimeSpec, pullerImage string, pullerImageCommand []string, pullerResources *corev1.ResourceRequirements) func(*unstructured.Unstructured) error {
func addPullerTransform(rts *kserveapi.ServingRuntimeSpec, pullerImage string, pullerImageCommand []string, pullerResources *corev1.ResourceRequirements, pvcs []string) func(*unstructured.Unstructured) error {
return func(resource *unstructured.Unstructured) error {
var deployment = &appsv1.Deployment{}
if err := scheme.Scheme.Convert(resource, deployment, nil); err != nil {
return err
}

err := addPullerSidecar(rts, deployment, pullerImage, pullerImageCommand, pullerResources)
err := addPullerSidecar(rts, deployment, pullerImage, pullerImageCommand, pullerResources, pvcs)
if err != nil {
return err
}
Expand All @@ -41,7 +42,7 @@ func addPullerTransform(rts *kserveapi.ServingRuntimeSpec, pullerImage string, p
}
}

func addPullerSidecar(rts *kserveapi.ServingRuntimeSpec, deployment *appsv1.Deployment, pullerImage string, pullerImageCommand []string, pullerResources *corev1.ResourceRequirements) error {
func addPullerSidecar(rts *kserveapi.ServingRuntimeSpec, deployment *appsv1.Deployment, pullerImage string, pullerImageCommand []string, pullerResources *corev1.ResourceRequirements, pvcs []string) error {
endpoint, err := ValidateEndpoint(*rts.GrpcMultiModelManagementEndpoint)
if err != nil {
return err
Expand All @@ -67,6 +68,9 @@ func addPullerSidecar(rts *kserveapi.ServingRuntimeSpec, deployment *appsv1.Depl
}, {
Name: PullerEnvStorageConfigDir,
Value: PullerConfigPath,
}, {
Name: PullerEnvPVCDir,
Value: DefaultPVCMountsDir,
},
},
Image: pullerImage,
Expand Down Expand Up @@ -98,6 +102,13 @@ func addPullerSidecar(rts *kserveapi.ServingRuntimeSpec, deployment *appsv1.Depl
MountPath: udsParentPath,
})
}
for _, pvcName := range pvcs {
cspec.VolumeMounts = append(cspec.VolumeMounts, corev1.VolumeMount{
Name: pvcName,
MountPath: DefaultPVCMountsDir + string(filepath.Separator) + pvcName,
ReadOnly: true,
})
}

deployment.Spec.Template.Spec.Containers = append(deployment.Spec.Template.Spec.Containers, cspec)

Expand Down
2 changes: 1 addition & 1 deletion controllers/modelmesh/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestPuller(t *testing.T) {
}
deployment := &appsv1.Deployment{}

err := addPullerSidecar(&rt.Spec, deployment, "", nil, &corev1.ResourceRequirements{})
err := addPullerSidecar(&rt.Spec, deployment, "", nil, &corev1.ResourceRequirements{}, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
24 changes: 24 additions & 0 deletions controllers/modelmesh/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

const (
ModelsDir string = "/models"
PVCRootDir string = "/pvc_mounts"
ModelDirScale float64 = 1.5
)

Expand Down Expand Up @@ -81,6 +82,21 @@ func (m *Deployment) addVolumesToDeployment(deployment *appsv1.Deployment) error
volumes = append(volumes, storageVolume)
}

// need to add pvc volumes
for _, pvcName := range m.PVCs {
pvcVolume := corev1.Volume{
Name: pvcName,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: pvcName,
ReadOnly: true,
},
},
}

volumes = append(volumes, pvcVolume)
}

deployment.Spec.Template.Spec.Volumes = volumes

return nil
Expand Down Expand Up @@ -124,6 +140,14 @@ func (m *Deployment) addRuntimeToDeployment(deployment *appsv1.Deployment) error
},
}

for _, pvcName := range m.PVCs {
volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: pvcName,
MountPath: PVCRootDir + "/" + pvcName,
ReadOnly: true,
})
}

// Now add the containers specified in serving runtime spec
for i := range rts.Containers {
// by modifying in-place we rely on the fact that the cacheing
Expand Down
86 changes: 86 additions & 0 deletions controllers/servingruntime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ import (
"github.com/kserve/modelmesh-serving/controllers/modelmesh"
)

const (
StoragePVCType = "pvc"
)

// ServingRuntimeReconciler reconciles a ServingRuntime object
type ServingRuntimeReconciler struct {
client.Client
Expand All @@ -75,6 +79,8 @@ type ServingRuntimeReconciler struct {
ClusterScope bool
// whether the controller is enabled to read and watch ClusterServingRuntimes
EnableCSRWatch bool
// whether the controller is enabled to read and watch secrets
EnableSecretWatch bool
// store some information about current runtimes for making scaling decisions
runtimeInfoMap map[types.NamespacedName]*runtimeInfo
runtimeInfoMapMutex sync.Mutex
Expand Down Expand Up @@ -224,6 +230,11 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, fmt.Errorf("Invalid runtime Spec: %w", err)
}

var pvcs []string
if pvcs, err = r.getPVCs(ctx, req, spec, cfg); err != nil {
return ctrl.Result{}, fmt.Errorf("Could not get pvcs: %w", err)
}

// construct the deployment
mmDeployment := modelmesh.Deployment{
ServiceName: cfg.InferenceServiceName,
Expand All @@ -249,6 +260,7 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
PullerResources: cfg.StorageHelperResources.ToKubernetesType(),
Port: cfg.InferenceServicePort,
GrpcMaxMessageSize: cfg.GrpcMaxMessageSizeBytes,
PVCs: pvcs,
// Replicas is set below
TLSSecretName: cfg.TLS.SecretName,
TLSClientAuth: cfg.TLS.ClientAuth,
Expand Down Expand Up @@ -286,6 +298,60 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{RequeueAfter: requeueDuration}, nil
}

func (r *ServingRuntimeReconciler) getPVCs(ctx context.Context, req ctrl.Request, rt *kserveapi.ServingRuntimeSpec, cfg *config.Config) ([]string, error) {
s := &corev1.Secret{}
if err := r.Client.Get(ctx, types.NamespacedName{
Name: modelmesh.StorageSecretName,
Namespace: req.Namespace,
}, s); err != nil {
return nil, fmt.Errorf("Could not get the storage secret: %w", err)
}

pvcsMap := make(map[string]struct{})
var storageConfig map[string]string
for _, storageData := range s.Data {
if err := json.Unmarshal(storageData, &storageConfig); err != nil {
return nil, fmt.Errorf("Could not parse storage configuration json: %w", err)
}
if storageConfig["type"] == StoragePVCType {
if name := storageConfig["name"]; name != "" {
pvcsMap[name] = struct{}{}
} else {
r.Log.V(1).Info("Missing PVC name in storage configuration")
}
}
}

// add pvcs for predictors when the global config is enabled
if cfg.AllowAnyPVC {
restProxyEnabled := cfg.RESTProxy.Enabled
f := func(p *api.Predictor) bool {
if runtimeSupportsPredictor(rt, p, restProxyEnabled, req.Name) &&
p.Spec.Storage != nil && p.Spec.Storage.Parameters != nil {
params := *p.Spec.Storage.Parameters
if stype := params["type"]; stype == "pvc" {
if name := params["name"]; name != "" {
pvcsMap[name] = struct{}{}
}
}
}
return false
}

for _, pr := range r.RegistryMap {
if _, err := pr.Find(ctx, req.Namespace, f); err != nil {
return nil, err
}
}
}
pvcs := make([]string, 0, len(pvcsMap))
for pvc := range pvcsMap {
pvcs = append(pvcs, pvc)
}

return pvcs, nil
}

func (r *ServingRuntimeReconciler) removeRuntimeFromInfoMap(req ctrl.Request) (ctrl.Result, error) {
// remove runtime from info map
r.runtimeInfoMapMutex.Lock()
Expand Down Expand Up @@ -491,6 +557,13 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager,
}))
}

if r.EnableSecretWatch {
builder = builder.Watches(&source.Kind{Type: &corev1.Secret{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
return r.storageSecretRequests(o.(*corev1.Secret))
}))
}

if sourcePluginEvents != nil {
builder.Watches(&source.Channel{Source: sourcePluginEvents},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
Expand Down Expand Up @@ -624,3 +697,16 @@ func (r *ServingRuntimeReconciler) clusterServingRuntimeRequests(csr *kserveapi.

return requests
}

func (r *ServingRuntimeReconciler) storageSecretRequests(secret *corev1.Secret) []reconcile.Request {
// check whether namespace is modelmesh enabled
mme, err := modelMeshEnabled2(context.TODO(), secret.Namespace, r.ControllerNamespace, r.Client, r.ClusterScope)
if err != nil || !mme {
return []reconcile.Request{}
}
if secret.Name == modelmesh.StorageSecretName {
return r.requestsForRuntimes(secret.Namespace, nil)
}

return []reconcile.Request{}
}
1 change: 1 addition & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ The following parameters are currently supported. _Note_ the keys are expressed
| `runtimePodLabels` | `metadata.labels` to be added to all `ServingRuntime` pods | (\*\*\*\*\*) See default labels below |
| `runtimePodAnnotations` | `metadata.annotations` to be added to all `ServingRuntime` pods | (\*\*\*\*\*) See default annotations below |
| `imagePullSecrets` | The image pull secrets to use for runtime Pods | |
| `allowAnyPVC` | Allows any PVC in predictor to configure PVC for runtime pods when it's not in storage secret | `false` |

(\*) Currently requires a controller restart to take effect

Expand Down
14 changes: 0 additions & 14 deletions fvt/predictor/predictor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,20 +1042,6 @@ var _ = Describe("Invalid Predictors", func() {
// TODO can we check for a more detailed error message?
})

It("predictor should fail to load with unsupported storage type", func() {
// modify the object with a PVC storage type, which isn't yet supported
err := unstructured.SetNestedField(predictorObject.Object, map[string]interface{}{
"claimName": "not-yet-supported",
}, "spec", "storage", "persistentVolumeClaim")
Expect(err).ToNot(HaveOccurred())

obj := CreatePredictorAndWaitAndExpectInvalidSpec(predictorObject)

By("Asserting on the predictor state")
ExpectPredictorFailureInfo(obj, "InvalidPredictorSpec", false, false,
"spec.storage.PersistentVolumeClaim is not supported")
})

It("predictor should fail to load with unrecognized model type", func() {
// modify the object with an unrecognized model type
SetString(predictorObject, "invalidModelType", "spec", "modelType", "name")
Expand Down
29 changes: 26 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ const (
LeaderForLifeLockName = "modelmesh-controller-leader-for-life-lock"
EnableInferenceServiceEnvVar = "ENABLE_ISVC_WATCH"
EnableClusterServingRuntimeEnvVar = "ENABLE_CSR_WATCH"
EnableSecretEnvVar = "ENABLE_SECRET_WATCH"
NamespaceScopeEnvVar = "NAMESPACE_SCOPE"
TrueString = "true"
FalseString = "false"
)

func init() {
Expand Down Expand Up @@ -313,7 +315,7 @@ func main() {
registryKey string, registryValue predictor_source.PredictorRegistry) bool {

envVarVal, _ := os.LookupEnv(envVar)
if envVarVal != "false" {
if envVarVal != FalseString {
err = cl.Get(context.Background(), client.ObjectKey{Name: "foo", Namespace: ControllerNamespace}, resourceObject)
if err == nil || errors.IsNotFound(err) {
registryMap[registryKey] = registryValue
Expand All @@ -336,9 +338,9 @@ func main() {
controllers.InferenceServiceCRSourceId, predictor_source.InferenceServiceRegistry{Client: mgr.GetClient()})

checkCSRVar := func(envVar string, resourceName string, resourceObject client.Object) bool {

// default is true
envVarVal, _ := os.LookupEnv(envVar)
if envVarVal != "false" {
if envVarVal != FalseString {
err = cl.Get(context.Background(), client.ObjectKey{Name: "foo", Namespace: ControllerNamespace}, resourceObject)
if err == nil || errors.IsNotFound(err) {
setupLog.Info(fmt.Sprintf("Reconciliation of %s is enabled", resourceName))
Expand All @@ -357,6 +359,26 @@ func main() {
}
enableCSRWatch := checkCSRVar(EnableClusterServingRuntimeEnvVar, "ClusterServingRuntime", &v1alpha1.ClusterServingRuntime{})

checkSecretVar := func(envVar string, resourceName string, resourceObject client.Object) bool {
// default is true
envVarVal, _ := os.LookupEnv(envVar)
if envVarVal != FalseString {
err = cl.Get(context.Background(), client.ObjectKey{Name: "storage-config", Namespace: ControllerNamespace}, resourceObject)
if err == nil || errors.IsNotFound(err) {
setupLog.Info(fmt.Sprintf("Reconciliation of %s is enabled", resourceName))
return true
} else if envVarVal == TrueString {
// If env var is explicitly true, require that specified CRD is present
setupLog.Error(err, fmt.Sprintf("Unable to access %s resource", resourceName))
os.Exit(1)
} else {
setupLog.Error(err, fmt.Sprintf("%s CRD not accessible, will not reconcile", resourceName))
}
}
return false
}
enableSecretWatch := checkSecretVar(EnableSecretEnvVar, "Secret", &corev1.Secret{})

var predictorControllerEvents, runtimeControllerEvents chan event.GenericEvent
if len(sources) != 0 {
predictorControllerEvents = make(chan event.GenericEvent, 256)
Expand Down Expand Up @@ -420,6 +442,7 @@ func main() {
ControllerName: controllerDeploymentName,
ClusterScope: clusterScopeMode,
EnableCSRWatch: enableCSRWatch,
EnableSecretWatch: enableSecretWatch,
RegistryMap: registryMap,
}).SetupWithManager(mgr, enableIsvcWatch, runtimeControllerEvents); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ServingRuntime")
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Config struct {
// System config
EtcdSecretName string // DEPRECATED - should be removed in the future
ModelMeshEndpoint string // For dev use only
AllowAnyPVC bool

// Service config
InferenceServiceName string
Expand Down
Loading

0 comments on commit f794ffc

Please sign in to comment.