Skip to content

Commit

Permalink
Merge pull request #4 from Riskified/DEV-71016
Browse files Browse the repository at this point in the history
Add / remove subsets
  • Loading branch information
Nisan Itzhakov authored Dec 12, 2023
2 parents 25595c8 + c3a2ad3 commit 2bc4036
Show file tree
Hide file tree
Showing 12 changed files with 785 additions and 77 deletions.
15 changes: 3 additions & 12 deletions api/v1alpha1/dynamicenv_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,23 +163,17 @@ func (de *DynamicEnv) validateIstioMatchImmutable(old runtime.Object) error {
func (de *DynamicEnv) validatePartialUpdateSubsets(old runtime.Object) error {
oldSubsets := append(old.(*DynamicEnv).Spec.Subsets, old.(*DynamicEnv).Spec.Consumers...)
newSubsets := append(de.Spec.Subsets, de.Spec.Consumers...)
if len(oldSubsets) != len(newSubsets) {
desc := "Unsupported operation: add or remove subset"
return field.Invalid(field.NewPath("spec").Child("Subsets"), newSubsets, desc)
}
for _, subset := range oldSubsets {
if err := findMatchingSubset(subset, newSubsets); err != nil {
if err := validateSubsetModifications(subset, newSubsets); err != nil {
return err
}
}
return nil
}

func findMatchingSubset(old Subset, subsets []Subset) error {
var foundMatching = false
func validateSubsetModifications(old Subset, subsets []Subset) error {
for _, s := range subsets {
if old.Name == s.Name && old.Namespace == s.Namespace {
foundMatching = true
if err := compareContainers(old.Containers, s.Containers); err != nil {
desc := fmt.Sprintf("couldn't find matching container to existing container: %s", err.Error())
return field.Invalid(
Expand All @@ -198,12 +192,9 @@ func findMatchingSubset(old Subset, subsets []Subset) error {
)
return field.Invalid(field.NewPath("spec").Child("Subsets"), subsets, desc)
}
break
}
}
if !foundMatching {
desc := "Changing name/namespace of subset is forbidden"
return field.Invalid(field.NewPath("spec").Child("Subsets"), subsets, desc)
}
return nil
}

Expand Down
30 changes: 0 additions & 30 deletions api/v1alpha1/dynamicenv_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,36 +421,6 @@ var _ = Describe("Validating Webhook", func() {
Expect(errorResult).To(HaveOccurred())
Expect(errorResult.Error()).To(ContainSubstring(partialError))
},
Entry(
"Removing Subset",
"fixtures/disallowed-modifications-removing-subset-old.yaml",
"fixtures/disallowed-modifications-removing-subset-new.yaml",
"add or remove",
),
Entry(
"Removing Consumer",
"fixtures/disallowed-modifications-removing-consumer-old.yaml",
"fixtures/disallowed-modifications-removing-consumer-new.yaml",
"add or remove",
),
Entry(
"Adding Subset",
"fixtures/disallowed-modifications-adding-subset-old.yaml",
"fixtures/disallowed-modifications-adding-subset-new.yaml",
"add or remove",
),
Entry(
"modifying subsets name",
"fixtures/disallowed-modifications-modifying-subset-name-old.yaml",
"fixtures/disallowed-modifications-modifying-subset-name-new.yaml",
"name/namespace of subset",
),
Entry(
"modifying subsets namespace",
"fixtures/disallowed-modifications-modifying-subsets-namespace-old.yaml",
"fixtures/disallowed-modifications-modifying-subsets-namespace-new.yaml",
"name/namespace of subset",
),
Entry(
"modifying container name",
"fixtures/disallowed-modifications-modifying-container-name-old.yaml",
Expand Down
232 changes: 201 additions & 31 deletions controllers/dynamicenv_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type DynamicEnvReconciler struct {

type ReconcileLoopStatus struct {
returnError error
cleanupError error
subsetMessages map[string]riskifiedv1alpha1.SubsetMessages
consumerMessages map[string][]string
// Non ready consumers and subsets
Expand Down Expand Up @@ -117,10 +118,10 @@ func (r *DynamicEnvReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, fmt.Errorf("failed to fetch dynamic environment: %w", err)
}

uniqueVersion := helpers.UniqueDynamicEnvName(dynamicEnv.Name, dynamicEnv.Namespace)
uniqueVersion := helpers.UniqueDynamicEnvName(dynamicEnv)

if markedForDeletion(dynamicEnv) {
return r.cleanDynamicEnvResources(ctx, dynamicEnv, uniqueVersion)
return r.cleanDynamicEnvResources(ctx, dynamicEnv)
}

if err := r.addFinalizersIfRequired(ctx, dynamicEnv); err != nil {
Expand All @@ -142,9 +143,13 @@ func (r *DynamicEnvReconciler) Reconcile(ctx context.Context, req ctrl.Request)

subsetsAndConsumers := mergeSubsetsAndConsumers(dynamicEnv.Spec.Subsets, dynamicEnv.Spec.Consumers)

if err := r.cleanupRemovedSubsetsOrConsumers(ctx, subsetsAndConsumers, uniqueVersion, dynamicEnv); err != nil {
rls.cleanupError = err
}

for _, st := range subsetsAndConsumers {
s := st.Subset
uniqueName := s.Name + "-" + uniqueVersion
uniqueName := mkSubsetUniqueName(s.Name, uniqueVersion)
defaultVersionForSubset := r.DefaultVersion
if s.DefaultVersion != "" {
defaultVersionForSubset = s.DefaultVersion
Expand Down Expand Up @@ -247,6 +252,12 @@ func (r *DynamicEnvReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}

if rls.cleanupError != nil {
// While the error occurred previously these errors are less important than the main subsets loop so we apply
// them now only if not masking.
rls.setErrorIfNotMasking(rls.returnError)
}

for _, handler := range deploymentHandlers {
newStatus, err := handler.GetStatus()
if err != nil {
Expand Down Expand Up @@ -303,8 +314,12 @@ func (r *DynamicEnvReconciler) Reconcile(ctx context.Context, req ctrl.Request)
statusHandler.SyncSubsetMessagesToStatus(rls.subsetMessages)
statusHandler.SyncConsumerMessagesToStatus(rls.consumerMessages)
if err := statusHandler.SetGlobalState(globalState, len(subsetsAndConsumers), len(rls.nonReadyCS)); err != nil {
log.Error(err, "error setting global state", "global-state", globalState, "subsetMessages", rls.subsetMessages)
rls.setErrorIfNotMasking(err)
if errors.IsConflict(err) {
log.Info("Ignoring global status update error due to conflict")
} else {
log.Error(err, "error setting global state", "global-state", globalState, "subsetMessages", rls.subsetMessages)
rls.setErrorIfNotMasking(err)
}
}

if nonReadyExists && rls.returnError == nil {
Expand All @@ -315,6 +330,28 @@ func (r *DynamicEnvReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, rls.returnError
}

func findDeletedSC(de *riskifiedv1alpha1.DynamicEnv, allSC map[string]riskifiedv1alpha1.SubsetOrConsumer) map[string]riskifiedv1alpha1.SubsetOrConsumer {
var keys []string
for k := range allSC {
keys = append(keys, k)
}
var removed = make(map[string]riskifiedv1alpha1.SubsetOrConsumer)
for name := range de.Status.ConsumersStatus {
if !helpers.StringSliceContains(name, keys) {
removed[name] = riskifiedv1alpha1.CONSUMER
}
}
for name := range de.Status.SubsetsStatus {
if !helpers.StringSliceContains(name, keys) {
removed[name] = riskifiedv1alpha1.SUBSET
}
}
if len(removed) > 0 {
ctrl.Log.V(1).Info("Found subsets/consumers to be cleaned up", "subsets/consumers", removed)
}
return removed
}

func (r *DynamicEnvReconciler) locateMatchingServiceHostnames(ctx context.Context, namespace string, ls labels.Set) (serviceHosts []string, err error) {

services := v1.ServiceList{}
Expand Down Expand Up @@ -357,7 +394,7 @@ func (r *DynamicEnvReconciler) addFinalizersIfRequired(ctx context.Context, de *
return nil
}

func (r *DynamicEnvReconciler) cleanDynamicEnvResources(ctx context.Context, de *riskifiedv1alpha1.DynamicEnv, version string) (ctrl.Result, error) {
func (r *DynamicEnvReconciler) cleanDynamicEnvResources(ctx context.Context, de *riskifiedv1alpha1.DynamicEnv) (ctrl.Result, error) {
log := ctrllog.FromContext(ctx)
log.Info("Dynamic Env marked for deletion, cleaning up ...")
if helpers.StringSliceContains(names.DeleteDeployments, de.Finalizers) {
Expand Down Expand Up @@ -385,7 +422,7 @@ func (r *DynamicEnvReconciler) cleanDynamicEnvResources(ctx context.Context, de
}
}
if helpers.StringSliceContains(names.CleanupVirtualServices, de.Finalizers) {
if err := r.cleanupVirtualServices(ctx, de, version); err != nil {
if err := r.cleanupVirtualServices(ctx, de); err != nil {
log.Error(err, "error removing CleanupVirtualServices finalizer")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -450,32 +487,13 @@ func (r *DynamicEnvReconciler) cleanupDestinationRules(ctx context.Context, de *
return runningCount, nil
}

func (r *DynamicEnvReconciler) cleanupVirtualServices(ctx context.Context, de *riskifiedv1alpha1.DynamicEnv, version string) error {
func (r *DynamicEnvReconciler) cleanupVirtualServices(ctx context.Context, de *riskifiedv1alpha1.DynamicEnv) error {
vss := collectVirtualServices(de)
for _, item := range vss {
ctrl.Log.Info("Cleaning up Virtual Service ...", "virtual-service", item)
found := istionetwork.VirtualService{}
if err := r.Get(ctx, types.NamespacedName{Name: item.Name, Namespace: item.Namespace}, &found); err != nil {
if errors.IsNotFound(err) {
ctrl.Log.Info("Cleanup: Didn't find virtual service. Probably deleted", "virtual-service", item)
continue
}
ctrl.Log.Error(err, "error searching for virtual service during cleanup", "virtual-service", item)
return err
}
var newRoutes []*istioapi.HTTPRoute
for _, route := range found.Spec.Http {
if strings.HasPrefix(route.Name, helpers.CalculateVirtualServicePrefix(version, "")) {
ctrl.Log.V(1).Info("Found route to cleanup", "route", route)
continue
}
newRoutes = append(newRoutes, route)
}
found.Spec.Http = newRoutes
watches.RemoveFromAnnotation(types.NamespacedName{Name: de.Name, Namespace: de.Namespace}, &found)
if err := r.Update(ctx, &found); err != nil {
ctrl.Log.Error(err, "error updating virtual service after cleanup", "virtual-service", found.Name)
return err
// TODO: trying to ignore version collision and return nil causes a lot of kuttl tests failures and dynamic
// environments who could not be deleted because of finalizers.
if err := r.cleanupVirtualService(ctx, item, de); err != nil {
return fmt.Errorf("cleaning up all virtual services: %w", err)
}
}
return nil
Expand All @@ -502,6 +520,154 @@ func (r *DynamicEnvReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

// Cleanup subsets and consumers that are removed from the dynamic environment CRD.
func (r *DynamicEnvReconciler) cleanupRemovedSubsetsOrConsumers(ctx context.Context, subsetsAndConsumers []SubsetType, version string, de *riskifiedv1alpha1.DynamicEnv) error {
var allSC = make(map[string]riskifiedv1alpha1.SubsetOrConsumer)
for _, st := range subsetsAndConsumers {
uniqueName := mkSubsetUniqueName(st.Subset.Name, version)
allSC[uniqueName] = st.Type
}
deletedSC := findDeletedSC(de, allSC)
for name, typ := range deletedSC {
if typ == riskifiedv1alpha1.CONSUMER {
if err := r.cleanupConsumer(ctx, name, de); err != nil {
if errors.IsConflict(err) {
ctrl.Log.Info("Ignoring conflict removing consumer", "consumer", name)
} else {
ctrl.Log.Error(err, "cleanup removed consumer")
return fmt.Errorf("cleanup removed consumer: %w", err)
}
}
} else {
if err := r.cleanupSubset(ctx, name, de); err != nil {
if errors.IsConflict(err) {
ctrl.Log.Info("Ignoring conflict removing subset", "subset", name)
} else {
ctrl.Log.Error(err, "cleanup removed subset")
return fmt.Errorf("cleanup removed subset: %w", err)
}
}
}
}

return nil
}

// searches for `name` in the consumersStatus and delete it. If not found deletes from status
func (r *DynamicEnvReconciler) cleanupConsumer(ctx context.Context, name string, de *riskifiedv1alpha1.DynamicEnv) error {
st, ok := de.Status.ConsumersStatus[name]
if ok {
// todo delete resources
found, err := r.deleteDeployment(ctx, st.ResourceStatus)
if err != nil {
return fmt.Errorf("deleting removed consumer: %w", err)
}
if !found {
delete(de.Status.ConsumersStatus, name)
if err := r.Status().Update(ctx, de); err != nil {
return fmt.Errorf("deleting removed consumer status: %w", err)
}
}
}
ctrl.Log.V(1).Info("Consumer cleanup finished", "consumer", name)
return nil
}

// searches for `name` in the subsetsStatus and delete it. If not found deletes from status
func (r *DynamicEnvReconciler) cleanupSubset(ctx context.Context, name string, de *riskifiedv1alpha1.DynamicEnv) error {
st, ok := de.Status.SubsetsStatus[name]
exists := false
if ok {
found, err := r.deleteDeployment(ctx, st.Deployment)
if err != nil {
return fmt.Errorf("deleting deployment from removed subset: %w", err)
}
if found {
exists = found
}
for _, dr := range st.DestinationRules {
found, err := r.deleteDestinationRule(ctx, dr)
if err != nil {
return fmt.Errorf("deleting destination rule from removed subset: %w", err)
}
if found {
exists = found
}
}
for _, vs := range st.VirtualServices {
if err := r.cleanupVirtualService(ctx, vs, de); err != nil {
return fmt.Errorf("cleaning virtual servive from removed subset routes: %w", err)
}
}
}
if !exists {
delete(de.Status.SubsetsStatus, name)
if err := r.Status().Update(ctx, de); err != nil {
return fmt.Errorf("deleting removed subset status: %w", err)
}
}
ctrl.Log.V(1).Info("Subset cleanup finished", "subset", name)
return nil
}

func (r *DynamicEnvReconciler) deleteDeployment(ctx context.Context, deployment riskifiedv1alpha1.ResourceStatus) (found bool, err error) {
dep := appsv1.Deployment{}
if err := r.Get(ctx, types.NamespacedName{Name: deployment.Name, Namespace: deployment.Namespace}, &dep); err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, fmt.Errorf("fetching deployment for removal: %w", err)
}
if err := r.Delete(ctx, &dep); err != nil {
return true, fmt.Errorf("deoeting deployment: %w", err)
}
return true, nil
}

func (r *DynamicEnvReconciler) deleteDestinationRule(ctx context.Context, dr riskifiedv1alpha1.ResourceStatus) (found bool, err error) {
toDelete := istionetwork.DestinationRule{}
if err := r.Get(ctx, types.NamespacedName{Name: dr.Name, Namespace: dr.Namespace}, &toDelete); err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, fmt.Errorf("fetching destination rule for deletion: %w", err)
}
if err := r.Delete(ctx, &toDelete); err != nil {
return true, fmt.Errorf("deleting destination rule: %w", err)
}
return true, nil
}

func (r *DynamicEnvReconciler) cleanupVirtualService(ctx context.Context, vs riskifiedv1alpha1.ResourceStatus, de *riskifiedv1alpha1.DynamicEnv) error {
version := helpers.UniqueDynamicEnvName(de)
ctrl.Log.Info("Cleaning up Virtual Service ...", "virtual-service", vs)
found := istionetwork.VirtualService{}
if err := r.Get(ctx, types.NamespacedName{Name: vs.Name, Namespace: vs.Namespace}, &found); err != nil {
if errors.IsNotFound(err) {
ctrl.Log.Info("Cleanup: Didn't find virtual service. Probably deleted", "virtual-service", vs)
return nil
} else {
ctrl.Log.Error(err, "error searching for virtual service during cleanup", "virtual-service", vs)
return err
}
}
var newRoutes []*istioapi.HTTPRoute
for _, route := range found.Spec.Http {
if strings.HasPrefix(route.Name, helpers.CalculateVirtualServicePrefix(version, "")) {
ctrl.Log.V(1).Info("Found route to cleanup", "route", route)
continue
}
newRoutes = append(newRoutes, route)
}
found.Spec.Http = newRoutes
watches.RemoveFromAnnotation(types.NamespacedName{Name: de.Name, Namespace: de.Namespace}, &found)
if err := r.Update(ctx, &found); err != nil {
ctrl.Log.Error(err, "error updating virtual service after cleanup", "virtual-service", found.Name)
return err
}
return nil
}

func mergeSubsetsAndConsumers(subsets, consumers []riskifiedv1alpha1.Subset) []SubsetType {
var result []SubsetType
for _, s := range subsets {
Expand Down Expand Up @@ -532,3 +698,7 @@ func collectVirtualServices(de *riskifiedv1alpha1.DynamicEnv) []riskifiedv1alpha
func markedForDeletion(de *riskifiedv1alpha1.DynamicEnv) bool {
return de.DeletionTimestamp != nil
}

func mkSubsetUniqueName(name, version string) string {
return name + "-" + version
}
Loading

0 comments on commit 2bc4036

Please sign in to comment.