Skip to content

Commit

Permalink
Include EiT state as part of the desired state hash sent to clients
Browse files Browse the repository at this point in the history
When Encryption in-transit(EiT) is enabled/disabled the kernel mount
option for cephFS needs to be updated between prefer-crc/secure. So the
desired state hash needs to include the EiT state, so that
if the EiT state is changed the desired state hash will change and
the client will reconcile to get the updated mount option.

Signed-off-by: Malay Kumar Parida <[email protected]>
  • Loading branch information
malayparida2000 committed Oct 17, 2024
1 parent f80f43b commit ebce8fb
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 11 deletions.
75 changes: 64 additions & 11 deletions services/provider/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"encoding/json"
"encoding/pem"
"fmt"
"k8s.io/utils/ptr"
"math"
"net"
"slices"
"strconv"
"strings"
"time"

"k8s.io/utils/ptr"

"github.com/blang/semver/v4"
nbv1 "github.com/noobaa/noobaa-operator/v5/pkg/apis/noobaa/v1alpha1"
quotav1 "github.com/openshift/api/quota/v1"
Expand Down Expand Up @@ -193,7 +194,17 @@ func (s *OCSProviderServer) GetStorageConfig(ctx context.Context, req *pb.Storag
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to construct status response: %v", err)
}
desiredClientConfigHash := getDesiredClientConfigHash(channelName, consumerObj)

storageCluster, err := s.getStorageCluster(ctx)
if err != nil {
return nil, err
}

desiredClientConfigHash := getDesiredClientConfigHash(
channelName,
consumerObj,
isEncryptionInTransitEnabled(storageCluster.Spec.Network),
)

klog.Infof("successfully returned the config details to the consumer.")
return &pb.StorageConfigResponse{
Expand Down Expand Up @@ -751,15 +762,12 @@ func (s *OCSProviderServer) GetStorageClaimConfig(ctx context.Context, req *pb.S
"csi.storage.k8s.io/controller-expand-secret-name": provisionerSecretName,
}

storageClusters := &ocsv1.StorageClusterList{}
if err := s.client.List(ctx, storageClusters, client.InNamespace(s.namespace), client.Limit(2)); err != nil {
return nil, status.Errorf(codes.Internal, "failed to get storage cluster: %v", err)
}
if len(storageClusters.Items) != 1 {
return nil, status.Errorf(codes.Internal, "expecting one single storagecluster to exist")
storageCluster, err := s.getStorageCluster(ctx)
if err != nil {
return nil, err
}
var kernelMountOptions map[string]string
for _, option := range strings.Split(util.GetCephFSKernelMountOptions(&storageClusters.Items[0]), ",") {
for _, option := range strings.Split(util.GetCephFSKernelMountOptions(storageCluster), ",") {
if kernelMountOptions == nil {
kernelMountOptions = map[string]string{}
}
Expand Down Expand Up @@ -847,18 +855,28 @@ func (s *OCSProviderServer) ReportStatus(ctx context.Context, req *pb.ReportStat
return nil, status.Errorf(codes.Internal, "Failed to construct status response: %v", err)
}

desiredClientConfigHash := getDesiredClientConfigHash(channelName, storageConsumer)
storageCluster, err := s.getStorageCluster(ctx)
if err != nil {
return nil, err
}

desiredClientConfigHash := getDesiredClientConfigHash(
channelName,
storageConsumer,
isEncryptionInTransitEnabled(storageCluster.Spec.Network),
)

return &pb.ReportStatusResponse{
DesiredClientOperatorChannel: channelName,
DesiredConfigHash: desiredClientConfigHash,
}, nil
}

func getDesiredClientConfigHash(channelName string, storageConsumer *ocsv1alpha1.StorageConsumer) string {
func getDesiredClientConfigHash(channelName string, storageConsumer *ocsv1alpha1.StorageConsumer, encryptionInTransit bool) string {
var arr = []any{
channelName,
storageConsumer.Spec.StorageQuotaInGiB,
encryptionInTransit,
}
return util.CalculateMD5Hash(arr)
}
Expand All @@ -878,6 +896,41 @@ func (s *OCSProviderServer) getOCSSubscriptionChannel(ctx context.Context) (stri
return subscription.Spec.Channel, nil
}

func (s *OCSProviderServer) getStorageCluster(ctx context.Context) (*ocsv1.StorageCluster, error) {
scList := &ocsv1.StorageClusterList{}
if err := s.client.List(ctx, scList, client.InNamespace(s.namespace)); err != nil {
return nil, status.Errorf(codes.Internal, "failed to list storage clusters: %v", err)
}

var foundSc *ocsv1.StorageCluster
for i := range scList.Items {
sc := &scList.Items[i]
if sc.Status.Phase == util.PhaseIgnored {
continue // Skip Ignored storage cluster
}
if sc.Spec.AllowRemoteStorageConsumers {
if foundSc != nil {
// This means we have already found one storage cluster, so this is a second one
return nil, status.Errorf(codes.FailedPrecondition, "multiple provider storage clusters found")
}
foundSc = sc
}
}

if foundSc == nil {
return nil, status.Errorf(codes.NotFound, "no provider storage cluster found")
}

return foundSc, nil
}

func isEncryptionInTransitEnabled(networkSpec *rookCephv1.NetworkSpec) bool {
return networkSpec != nil &&
networkSpec.Connections != nil &&
networkSpec.Connections.Encryption != nil &&
networkSpec.Connections.Encryption.Enabled
}

func extractMonitorIps(data string) ([]string, error) {
var ips []string
mons := strings.Split(data, ",")
Expand Down
9 changes: 9 additions & 0 deletions services/provider/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,15 @@ func TestGetExternalResources(t *testing.T) {
ocsSubscription.Spec = ocsSubscriptionSpec
assert.NoError(t, client.Create(ctx, ocsSubscription))

storageCluster := &ocsv1.StorageCluster{
Spec: ocsv1.StorageClusterSpec{
AllowRemoteStorageConsumers: true,
},
}
storageCluster.Name = "test-storagecluster"
storageCluster.Namespace = serverNamespace
assert.NoError(t, client.Create(ctx, storageCluster))

// When ocsv1alpha1.StorageConsumerStateReady
req := pb.StorageConfigRequest{
StorageConsumerUUID: string(consumerResource.UID),
Expand Down

0 comments on commit ebce8fb

Please sign in to comment.