diff --git a/controllers/modelmesh/const.go b/controllers/modelmesh/const.go index 977e30de..e6af2ace 100644 --- a/controllers/modelmesh/const.go +++ b/controllers/modelmesh/const.go @@ -45,12 +45,18 @@ const ( //The env variable puller uses to configure the config dir (secrets) PullerEnvStorageConfigDir = "STORAGE_CONFIG_DIR" + //The env variable puller uses to configure the pvc dir (secrets) + PullerEnvPVCDir = "PVC_MOUNTS_DIR" + //The puller default port number PullerPortNumber = 8086 //The puller model mount path PullerModelPath = "/models" + //The puller model PVC path + DefaultPVCMountsDir = "/pvc_mounts" + //The puller model config path PullerConfigPath = "/storage-config" diff --git a/controllers/modelmesh/modelmesh.go b/controllers/modelmesh/modelmesh.go index beec8078..d7f4b2a2 100644 --- a/controllers/modelmesh/modelmesh.go +++ b/controllers/modelmesh/modelmesh.go @@ -53,6 +53,7 @@ type Deployment struct { RESTProxyImage string RESTProxyResources *corev1.ResourceRequirements RESTProxyPort uint16 + PVCs []string // internal fields used when templating ModelMeshLimitCPU string ModelMeshRequestsCPU string @@ -141,7 +142,7 @@ func (m *Deployment) Apply(ctx context.Context) error { if useStorageHelper(m.SRSpec) { manifest, err = manifest.Transform( - addPullerTransform(m.SRSpec, m.PullerImage, m.PullerImageCommand, m.PullerResources), + addPullerTransform(m.SRSpec, m.PullerImage, m.PullerImageCommand, m.PullerResources, m.PVCs), ) if err != nil { return fmt.Errorf("Error transforming: %w", err) diff --git a/controllers/modelmesh/puller.go b/controllers/modelmesh/puller.go index b0ac9910..c7678d3e 100644 --- a/controllers/modelmesh/puller.go +++ b/controllers/modelmesh/puller.go @@ -14,6 +14,7 @@ package modelmesh import ( + "path/filepath" "strconv" kserveapi "github.com/kserve/kserve/pkg/apis/serving/v1alpha1" @@ -25,14 +26,14 @@ import ( var StorageSecretName string -func addPullerTransform(rts *kserveapi.ServingRuntimeSpec, pullerImage string, pullerImageCommand []string, pullerResources *corev1.ResourceRequirements) func(*unstructured.Unstructured) error { +func addPullerTransform(rts *kserveapi.ServingRuntimeSpec, pullerImage string, pullerImageCommand []string, pullerResources *corev1.ResourceRequirements, pvcs []string) func(*unstructured.Unstructured) error { return func(resource *unstructured.Unstructured) error { var deployment = &appsv1.Deployment{} if err := scheme.Scheme.Convert(resource, deployment, nil); err != nil { return err } - err := addPullerSidecar(rts, deployment, pullerImage, pullerImageCommand, pullerResources) + err := addPullerSidecar(rts, deployment, pullerImage, pullerImageCommand, pullerResources, pvcs) if err != nil { return err } @@ -41,7 +42,7 @@ func addPullerTransform(rts *kserveapi.ServingRuntimeSpec, pullerImage string, p } } -func addPullerSidecar(rts *kserveapi.ServingRuntimeSpec, deployment *appsv1.Deployment, pullerImage string, pullerImageCommand []string, pullerResources *corev1.ResourceRequirements) error { +func addPullerSidecar(rts *kserveapi.ServingRuntimeSpec, deployment *appsv1.Deployment, pullerImage string, pullerImageCommand []string, pullerResources *corev1.ResourceRequirements, pvcs []string) error { endpoint, err := ValidateEndpoint(*rts.GrpcMultiModelManagementEndpoint) if err != nil { return err @@ -67,6 +68,9 @@ func addPullerSidecar(rts *kserveapi.ServingRuntimeSpec, deployment *appsv1.Depl }, { Name: PullerEnvStorageConfigDir, Value: PullerConfigPath, + }, { + Name: PullerEnvPVCDir, + Value: DefaultPVCMountsDir, }, }, Image: pullerImage, @@ -98,6 +102,13 @@ func addPullerSidecar(rts *kserveapi.ServingRuntimeSpec, deployment *appsv1.Depl MountPath: udsParentPath, }) } + for _, pvcName := range pvcs { + cspec.VolumeMounts = append(cspec.VolumeMounts, corev1.VolumeMount{ + Name: pvcName, + MountPath: DefaultPVCMountsDir + string(filepath.Separator) + pvcName, + ReadOnly: true, + }) + } deployment.Spec.Template.Spec.Containers = append(deployment.Spec.Template.Spec.Containers, cspec) diff --git a/controllers/modelmesh/puller_test.go b/controllers/modelmesh/puller_test.go index 1bb11dbd..0c5d9eb5 100644 --- a/controllers/modelmesh/puller_test.go +++ b/controllers/modelmesh/puller_test.go @@ -30,7 +30,7 @@ func TestPuller(t *testing.T) { } deployment := &appsv1.Deployment{} - err := addPullerSidecar(&rt.Spec, deployment, "", nil, &corev1.ResourceRequirements{}) + err := addPullerSidecar(&rt.Spec, deployment, "", nil, &corev1.ResourceRequirements{}, nil) if err != nil { t.Fatal(err) } diff --git a/controllers/modelmesh/runtime.go b/controllers/modelmesh/runtime.go index 896f35a7..f6534433 100644 --- a/controllers/modelmesh/runtime.go +++ b/controllers/modelmesh/runtime.go @@ -28,6 +28,7 @@ import ( const ( ModelsDir string = "/models" + PVCRootDir string = "/pvc_mounts" ModelDirScale float64 = 1.5 ) @@ -81,6 +82,21 @@ func (m *Deployment) addVolumesToDeployment(deployment *appsv1.Deployment) error volumes = append(volumes, storageVolume) } + // need to add pvc volumes + for _, pvcName := range m.PVCs { + pvcVolume := corev1.Volume{ + Name: pvcName, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvcName, + ReadOnly: true, + }, + }, + } + + volumes = append(volumes, pvcVolume) + } + deployment.Spec.Template.Spec.Volumes = volumes return nil @@ -124,6 +140,14 @@ func (m *Deployment) addRuntimeToDeployment(deployment *appsv1.Deployment) error }, } + for _, pvcName := range m.PVCs { + volumeMounts = append(volumeMounts, corev1.VolumeMount{ + Name: pvcName, + MountPath: PVCRootDir + "/" + pvcName, + ReadOnly: true, + }) + } + // Now add the containers specified in serving runtime spec for i := range rts.Containers { // by modifying in-place we rely on the fact that the cacheing diff --git a/controllers/servingruntime_controller.go b/controllers/servingruntime_controller.go index d433cbad..1c262a87 100644 --- a/controllers/servingruntime_controller.go +++ b/controllers/servingruntime_controller.go @@ -62,6 +62,10 @@ import ( "github.com/kserve/modelmesh-serving/controllers/modelmesh" ) +const ( + StoragePVCType = "pvc" +) + // ServingRuntimeReconciler reconciles a ServingRuntime object type ServingRuntimeReconciler struct { client.Client @@ -75,6 +79,8 @@ type ServingRuntimeReconciler struct { ClusterScope bool // whether the controller is enabled to read and watch ClusterServingRuntimes EnableCSRWatch bool + // whether the controller is enabled to read and watch secrets + EnableSecretWatch bool // store some information about current runtimes for making scaling decisions runtimeInfoMap map[types.NamespacedName]*runtimeInfo runtimeInfoMapMutex sync.Mutex @@ -224,6 +230,11 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, fmt.Errorf("Invalid runtime Spec: %w", err) } + var pvcs []string + if pvcs, err = r.getPVCs(ctx, req, spec, cfg); err != nil { + return ctrl.Result{}, fmt.Errorf("Could not get pvcs: %w", err) + } + // construct the deployment mmDeployment := modelmesh.Deployment{ ServiceName: cfg.InferenceServiceName, @@ -249,6 +260,7 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque PullerResources: cfg.StorageHelperResources.ToKubernetesType(), Port: cfg.InferenceServicePort, GrpcMaxMessageSize: cfg.GrpcMaxMessageSizeBytes, + PVCs: pvcs, // Replicas is set below TLSSecretName: cfg.TLS.SecretName, TLSClientAuth: cfg.TLS.ClientAuth, @@ -286,6 +298,60 @@ func (r *ServingRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{RequeueAfter: requeueDuration}, nil } +func (r *ServingRuntimeReconciler) getPVCs(ctx context.Context, req ctrl.Request, rt *kserveapi.ServingRuntimeSpec, cfg *config.Config) ([]string, error) { + s := &corev1.Secret{} + if err := r.Client.Get(ctx, types.NamespacedName{ + Name: modelmesh.StorageSecretName, + Namespace: req.Namespace, + }, s); err != nil { + return nil, fmt.Errorf("Could not get the storage secret: %w", err) + } + + pvcsMap := make(map[string]struct{}) + var storageConfig map[string]string + for _, storageData := range s.Data { + if err := json.Unmarshal(storageData, &storageConfig); err != nil { + return nil, fmt.Errorf("Could not parse storage configuration json: %w", err) + } + if storageConfig["type"] == StoragePVCType { + if name := storageConfig["name"]; name != "" { + pvcsMap[name] = struct{}{} + } else { + r.Log.V(1).Info("Missing PVC name in storage configuration") + } + } + } + + // add pvcs for predictors when the global config is enabled + if cfg.AllowAnyPVC { + restProxyEnabled := cfg.RESTProxy.Enabled + f := func(p *api.Predictor) bool { + if runtimeSupportsPredictor(rt, p, restProxyEnabled, req.Name) && + p.Spec.Storage != nil && p.Spec.Storage.Parameters != nil { + params := *p.Spec.Storage.Parameters + if stype := params["type"]; stype == "pvc" { + if name := params["name"]; name != "" { + pvcsMap[name] = struct{}{} + } + } + } + return false + } + + for _, pr := range r.RegistryMap { + if _, err := pr.Find(ctx, req.Namespace, f); err != nil { + return nil, err + } + } + } + pvcs := make([]string, 0, len(pvcsMap)) + for pvc := range pvcsMap { + pvcs = append(pvcs, pvc) + } + + return pvcs, nil +} + func (r *ServingRuntimeReconciler) removeRuntimeFromInfoMap(req ctrl.Request) (ctrl.Result, error) { // remove runtime from info map r.runtimeInfoMapMutex.Lock() @@ -491,6 +557,13 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager, })) } + if r.EnableSecretWatch { + builder = builder.Watches(&source.Kind{Type: &corev1.Secret{}}, + handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + return r.storageSecretRequests(o.(*corev1.Secret)) + })) + } + if sourcePluginEvents != nil { builder.Watches(&source.Channel{Source: sourcePluginEvents}, handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { @@ -624,3 +697,16 @@ func (r *ServingRuntimeReconciler) clusterServingRuntimeRequests(csr *kserveapi. return requests } + +func (r *ServingRuntimeReconciler) storageSecretRequests(secret *corev1.Secret) []reconcile.Request { + // check whether namespace is modelmesh enabled + mme, err := modelMeshEnabled2(context.TODO(), secret.Namespace, r.ControllerNamespace, r.Client, r.ClusterScope) + if err != nil || !mme { + return []reconcile.Request{} + } + if secret.Name == modelmesh.StorageSecretName { + return r.requestsForRuntimes(secret.Namespace, nil) + } + + return []reconcile.Request{} +} diff --git a/docs/configuration/README.md b/docs/configuration/README.md index 51fa9eb7..c8f78178 100644 --- a/docs/configuration/README.md +++ b/docs/configuration/README.md @@ -53,6 +53,7 @@ The following parameters are currently supported. _Note_ the keys are expressed | `runtimePodLabels` | `metadata.labels` to be added to all `ServingRuntime` pods | (\*\*\*\*\*) See default labels below | | `runtimePodAnnotations` | `metadata.annotations` to be added to all `ServingRuntime` pods | (\*\*\*\*\*) See default annotations below | | `imagePullSecrets` | The image pull secrets to use for runtime Pods | | +| `allowAnyPVC` | Allows any PVC in predictor to configure PVC for runtime pods when it's not in storage secret | `false` | (\*) Currently requires a controller restart to take effect diff --git a/fvt/predictor/predictor_test.go b/fvt/predictor/predictor_test.go index 65327f58..bfc8e9ee 100644 --- a/fvt/predictor/predictor_test.go +++ b/fvt/predictor/predictor_test.go @@ -1042,20 +1042,6 @@ var _ = Describe("Invalid Predictors", func() { // TODO can we check for a more detailed error message? }) - It("predictor should fail to load with unsupported storage type", func() { - // modify the object with a PVC storage type, which isn't yet supported - err := unstructured.SetNestedField(predictorObject.Object, map[string]interface{}{ - "claimName": "not-yet-supported", - }, "spec", "storage", "persistentVolumeClaim") - Expect(err).ToNot(HaveOccurred()) - - obj := CreatePredictorAndWaitAndExpectInvalidSpec(predictorObject) - - By("Asserting on the predictor state") - ExpectPredictorFailureInfo(obj, "InvalidPredictorSpec", false, false, - "spec.storage.PersistentVolumeClaim is not supported") - }) - It("predictor should fail to load with unrecognized model type", func() { // modify the object with an unrecognized model type SetString(predictorObject, "invalidModelType", "spec", "modelType", "name") diff --git a/main.go b/main.go index d3e3d0d1..3ce4518c 100644 --- a/main.go +++ b/main.go @@ -78,8 +78,10 @@ const ( LeaderForLifeLockName = "modelmesh-controller-leader-for-life-lock" EnableInferenceServiceEnvVar = "ENABLE_ISVC_WATCH" EnableClusterServingRuntimeEnvVar = "ENABLE_CSR_WATCH" + EnableSecretEnvVar = "ENABLE_SECRET_WATCH" NamespaceScopeEnvVar = "NAMESPACE_SCOPE" TrueString = "true" + FalseString = "false" ) func init() { @@ -313,7 +315,7 @@ func main() { registryKey string, registryValue predictor_source.PredictorRegistry) bool { envVarVal, _ := os.LookupEnv(envVar) - if envVarVal != "false" { + if envVarVal != FalseString { err = cl.Get(context.Background(), client.ObjectKey{Name: "foo", Namespace: ControllerNamespace}, resourceObject) if err == nil || errors.IsNotFound(err) { registryMap[registryKey] = registryValue @@ -336,9 +338,9 @@ func main() { controllers.InferenceServiceCRSourceId, predictor_source.InferenceServiceRegistry{Client: mgr.GetClient()}) checkCSRVar := func(envVar string, resourceName string, resourceObject client.Object) bool { - + // default is true envVarVal, _ := os.LookupEnv(envVar) - if envVarVal != "false" { + if envVarVal != FalseString { err = cl.Get(context.Background(), client.ObjectKey{Name: "foo", Namespace: ControllerNamespace}, resourceObject) if err == nil || errors.IsNotFound(err) { setupLog.Info(fmt.Sprintf("Reconciliation of %s is enabled", resourceName)) @@ -357,6 +359,26 @@ func main() { } enableCSRWatch := checkCSRVar(EnableClusterServingRuntimeEnvVar, "ClusterServingRuntime", &v1alpha1.ClusterServingRuntime{}) + checkSecretVar := func(envVar string, resourceName string, resourceObject client.Object) bool { + // default is true + envVarVal, _ := os.LookupEnv(envVar) + if envVarVal != FalseString { + err = cl.Get(context.Background(), client.ObjectKey{Name: "storage-config", Namespace: ControllerNamespace}, resourceObject) + if err == nil || errors.IsNotFound(err) { + setupLog.Info(fmt.Sprintf("Reconciliation of %s is enabled", resourceName)) + return true + } else if envVarVal == TrueString { + // If env var is explicitly true, require that specified CRD is present + setupLog.Error(err, fmt.Sprintf("Unable to access %s resource", resourceName)) + os.Exit(1) + } else { + setupLog.Error(err, fmt.Sprintf("%s CRD not accessible, will not reconcile", resourceName)) + } + } + return false + } + enableSecretWatch := checkSecretVar(EnableSecretEnvVar, "Secret", &corev1.Secret{}) + var predictorControllerEvents, runtimeControllerEvents chan event.GenericEvent if len(sources) != 0 { predictorControllerEvents = make(chan event.GenericEvent, 256) @@ -420,6 +442,7 @@ func main() { ControllerName: controllerDeploymentName, ClusterScope: clusterScopeMode, EnableCSRWatch: enableCSRWatch, + EnableSecretWatch: enableSecretWatch, RegistryMap: registryMap, }).SetupWithManager(mgr, enableIsvcWatch, runtimeControllerEvents); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ServingRuntime") diff --git a/pkg/config/config.go b/pkg/config/config.go index 4e4109b0..a67c63de 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -57,6 +57,7 @@ type Config struct { // System config EtcdSecretName string // DEPRECATED - should be removed in the future ModelMeshEndpoint string // For dev use only + AllowAnyPVC bool // Service config InferenceServiceName string diff --git a/pkg/predictor_source/inferenceservice_registry.go b/pkg/predictor_source/inferenceservice_registry.go index 9c6bc829..da617dfe 100644 --- a/pkg/predictor_source/inferenceservice_registry.go +++ b/pkg/predictor_source/inferenceservice_registry.go @@ -162,6 +162,10 @@ func processInferenceServiceStorage(inferenceService *v1beta1.InferenceService, } switch u.Scheme { + case "pvc": + modelPath = strings.TrimPrefix(u.Path, "/") + uriParameters["type"] = "pvc" + uriParameters["name"] = u.Host case "s3": modelPath = strings.TrimPrefix(u.Path, "/") uriParameters["type"] = "s3" @@ -318,7 +322,6 @@ func (isvcr InferenceServiceRegistry) Get(ctx context.Context, nname types.Names p.Spec.Storage.SchemaPath = schemaPath p.Spec.Storage.Parameters = ¶meters p.Spec.Storage.StorageKey = secretKey - return p, nil } @@ -332,10 +335,27 @@ func (isvcr InferenceServiceRegistry) Find(ctx context.Context, namespace string } for i := range list.Items { - p, _ := BuildBasePredictorFromInferenceService(&list.Items[i]) - if p != nil && predicate(p) { + inferenceService := &list.Items[i] + nname := types.NamespacedName{Name: inferenceService.Name, Namespace: inferenceService.Namespace} + p, err := BuildBasePredictorFromInferenceService(inferenceService) + if err != nil { return true, nil } + if p != nil { + secretKey, parameters, modelPath, schemaPath, err := processInferenceServiceStorage(inferenceService, nname) + if err != nil { + return true, nil + } + p.Spec.Storage = &v1alpha1.Storage{} + p.Spec.Storage.Path = &modelPath + p.Spec.Storage.SchemaPath = schemaPath + p.Spec.Storage.Parameters = ¶meters + p.Spec.Storage.StorageKey = secretKey + + if predicate(p) { + return true, nil + } + } } return false, nil }