Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retries for policies cm comprised of multiple dependent file #163

Merged
merged 3 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ k3d: && _skaffold-ctx
k3d cluster delete kube-mgmt || true
k3d cluster create --config ./test/e2e/k3d.yaml

rebuild: && build
rm -rf {{skaffoldTags}}

# build and publish docker to local registry
build: _skaffold-ctx
skaffold build --file-output={{skaffoldTags}} --platform=linux/amd64

# install into local k8s
up: _skaffold-ctx down
kubectl delete cm -l kube-mgmt/e2e=true || true
skaffold deploy --build-artifacts={{skaffoldTags}}

# remove from local k8s
Expand All @@ -64,6 +68,8 @@ down:

# run only e2e test script
test-e2e-sh:
#!/usr/bin/env bash
set -euo pipefail
kubectl delete cm -l kube-mgmt/e2e=true || true
./test/e2e/{{E2E_TEST}}/test.sh

Expand All @@ -73,7 +79,10 @@ test-e2e: up test-e2e-sh
# run all e2e tests
test-e2e-all: build
#!/usr/bin/env bash
set -euxo pipefail
for E in $(find test/e2e/ -mindepth 1 -maxdepth 1 -type d -printf '%f\n'); do
set -euo pipefail
for E in $(find test/e2e/ -mindepth 1 -maxdepth 1 -type d -printf '%f\n' | sort); do
echo "================"
echo "= Running ${E} "
echo "================"
just E2E_TEST=${E} test-e2e
done
91 changes: 44 additions & 47 deletions pkg/configmap/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sort"
"strings"
"time"
"strconv"

"github.com/open-policy-agent/kube-mgmt/pkg/opa"
"github.com/sirupsen/logrus"
Expand All @@ -30,16 +31,12 @@ import (
)

const (
policyStatusAnnotationKey = "openpolicyagent.org/policy-status"

dataLabelKey = "openpolicyagent.org/data"
dataLabelValue = "opa"
dataStatusAnnotationKey = "openpolicyagent.org/data-status"

defaultRetries = 2
statusAnnotationKey = "openpolicyagent.org/kube-mgmt-status"
retriesAnnotationKey = "openpolicyagent.org/kube-mgmt-retries"
// Special namespace in Kubernetes federation that holds scheduling policies.
// commented because staticcheck: 'const kubeFederationSchedulingPolicy is unused (U1000)'
// kubeFederationSchedulingPolicy = "kube-federation-scheduling-policy"

resyncPeriod = time.Second * 60
syncResetBackoffMin = time.Second
syncResetBackoffMax = time.Second * 30
Expand Down Expand Up @@ -138,8 +135,7 @@ func (s *Sync) Run(namespaces []string) (chan struct{}, error) {
}
quit := make(chan struct{})

logrus.Infof("Policy/data ConfigMap processor connected to K8s: namespaces=%v", namespaces)

logrus.Infof("Policy/data ConfigMap processor connected to K8s: namespaces=%v", namespaces)
for _, namespace := range namespaces {
if namespace == "*" {
namespace = v1.NamespaceAll
Expand All @@ -165,22 +161,22 @@ func (s *Sync) Run(namespaces []string) (chan struct{}, error) {

func (s *Sync) add(obj interface{}) {
cm := obj.(*v1.ConfigMap)
match, isPolicy := s.matcher(cm)
logrus.Debugf("OnAdd cm=%v/%v, match=%v, isPolicy=%v", cm.Namespace, cm.Name, match, isPolicy)
if match {
if match, isPolicy := s.matcher(cm); match {
logrus.Debugf("OnAdd cm=%v/%v, isPolicy=%v", cm.Namespace, cm.Name, isPolicy)
s.syncAdd(cm, isPolicy)
}
}

func (s *Sync) update(oldObj, obj interface{}) {
oldCm, cm := oldObj.(*v1.ConfigMap), obj.(*v1.ConfigMap)
match, isPolicy := s.matcher(cm)
logrus.Debugf("OnUpdate cm=%v/%v, match=%v, isPolicy=%v", cm.Namespace, cm.Name, match, isPolicy)
if match {
if match, isPolicy := s.matcher(cm); match {
logrus.Debugf("OnUpdate cm=%v/%v, isPolicy=%v, oldVer=%v, newVer=%v",
cm.Namespace, cm.Name, isPolicy, oldCm.GetResourceVersion(), cm.GetResourceVersion())
if cm.GetResourceVersion() != oldCm.GetResourceVersion() {
fp, oldFp := fingerprint(cm), fingerprint(oldCm)
logrus.Debugf("OnUpdate cm=%v/%v, old fingerprint=%v, new fingeprint=%v", cm.Namespace, cm.Name, fp, oldFp)
if fp != oldFp {
newFp, oldFp := fingerprint(cm), fingerprint(oldCm)
rtrVal := cm.Annotations[retriesAnnotationKey]
logrus.Debugf("OnUpdate cm=%v/%v, retries=%v, oldFp=%v, newFp=%v", cm.Namespace, cm.Name, rtrVal, oldFp, newFp)
if newFp != oldFp || rtrVal != "0" {
s.syncAdd(cm, isPolicy)
}
}
Expand All @@ -197,15 +193,15 @@ func (s *Sync) delete(obj interface{}) {
obj = d.Obj
}
cm := obj.(*v1.ConfigMap)
logrus.Debugf("OnDelete cm=%v/%v", cm.Namespace, cm.Name)
if match, isPolicy := s.matcher(cm); match {
logrus.Debugf("OnDelete cm=%v/%v", cm.Namespace, cm.Name)
s.syncRemove(cm, isPolicy)
}
}

func (s *Sync) syncAdd(cm *v1.ConfigMap, isPolicy bool) {
logrus.Debugf("Attempting to add cm=%v/%v, isPolicy=%v", cm.Namespace, cm.Name, isPolicy)
path := fmt.Sprintf("%v/%v", cm.Namespace, cm.Name)
logrus.Debugf("Adding cm=%v, isPolicy=%v", path, isPolicy)
// sort keys so that errors, if any, are always in the same order
sortedKeys := make([]string, 0, len(cm.Data))
for key := range cm.Data {
Expand All @@ -216,11 +212,10 @@ func (s *Sync) syncAdd(cm *v1.ConfigMap, isPolicy bool) {
for _, key := range sortedKeys {
value := cm.Data[key]
id := fmt.Sprintf("%v/%v", path, key)

var err error
if isPolicy {
err = s.opa.InsertPolicy(id, []byte(value))
logrus.Infof("Add policy %v finished, err=%v", id, err)
logrus.Infof("Added policy %v, err=%v", id, err)
} else {
// We don't need to know the JSON structure, just pass it
// directly to the OPA data store.
Expand All @@ -229,23 +224,38 @@ func (s *Sync) syncAdd(cm *v1.ConfigMap, isPolicy bool) {
logrus.Errorf("Failed to parse JSON data in configmap with id=%s", id)
} else {
err = s.opa.PutData(id, data)
logrus.Infof("Add data %v finished, err=%v", id, err)
logrus.Infof("Added data %v, err=%v", id, err)
}
}

if err != nil {
syncErr = append(syncErr, err)
}
}
if syncErr != nil {
s.setStatusAnnotation(cm, status{
var retries int = 0
if isPolicy {
if rStr, ok := cm.Annotations[retriesAnnotationKey]; ok {
r, err := strconv.Atoi(rStr)
if err == nil && r > 0 {
retries = r - 1
logrus.Debugf("Adding policies error cm=%v, old retry=%v, new retry=%v", path, rStr, retries)
} else if err == nil && r == 0 {
retries = defaultRetries
logrus.Debugf("Adding policies error cm=%v, old retry=%v, new retry=%v", path, rStr, retries)
}
} else {
retries = defaultRetries
logrus.Debugf("Adding policies error cm=%v, no retry annotation, new retry=%v", path, retries)
}
}
s.setAnnotations(cm, status{
Status: "error",
Error: syncErr,
}, isPolicy)
}, retries)
} else {
s.setStatusAnnotation(cm, status{
s.setAnnotations(cm, status{
Status: "ok",
}, isPolicy)
}, 0)
}
}

Expand All @@ -254,7 +264,6 @@ func (s *Sync) syncRemove(cm *v1.ConfigMap, isPolicy bool) {
path := fmt.Sprintf("%v/%v", cm.Namespace, cm.Name)
for key := range cm.Data {
id := fmt.Sprintf("%v/%v", path, key)

if isPolicy {
if err := s.opa.DeletePolicy(id); err != nil {
logrus.Errorf("Failed to delete policy %v: %v", id, err)
Expand All @@ -268,36 +277,24 @@ func (s *Sync) syncRemove(cm *v1.ConfigMap, isPolicy bool) {
}
}

func (s *Sync) setStatusAnnotation(cm *v1.ConfigMap, st status, isPolicy bool) {
func (s *Sync) setAnnotations(cm *v1.ConfigMap, st status, retries int) {
bs, err := json.Marshal(st)

statusAnnotationKey := policyStatusAnnotationKey
if !isPolicy {
statusAnnotationKey = dataStatusAnnotationKey
}
if err != nil {
logrus.Errorf("Failed to serialize %v for %v/%v: %v", statusAnnotationKey, cm.Namespace, cm.Name, err)
}
annotation := string(bs)
if cm.Annotations != nil {
if existing, ok := cm.Annotations[policyStatusAnnotationKey]; ok {
if existing == annotation {
// If the annotation did not change, do not write it.
// (issue https://github.com/open-policy-agent/kube-mgmt/issues/90)
return
}
}
logrus.Errorf("Failed to serialize status for cm=%v/%v, err=%v", cm.Namespace, cm.Name, err)
return
}
patch := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{
policyStatusAnnotationKey: annotation,
statusAnnotationKey: string(bs),
retriesAnnotationKey: strconv.Itoa(retries),
},
},
}
bs, err = json.Marshal(patch)
if err != nil {
logrus.Errorf("Failed to serialize patch for %v/%v: %v", cm.Namespace, cm.Name, err)
return
}
_, err = s.clientset.CoreV1().ConfigMaps(cm.Namespace).Patch(context.TODO(), cm.Name, types.StrategicMergePatchType, bs, metav1.PatchOptions{})
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions test/e2e/default/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ set -e
set -x

TOKEN=$(kubectl exec deploy/kube-mgmt-opa-kube-mgmt -c mgmt -- cat /bootstrap/mgmt-token)
OPA="http --default-scheme=https --verify=no -A bearer -a ${TOKEN} :8443/v1"
OPA="http --ignore-stdin --default-scheme=https --verify=no -A bearer -a ${TOKEN} :8443/v1"

${OPA}/data | jq -e '.result.test_helm_kubernetes_quickstart|keys|length==3'

kubectl apply -f "$(dirname $0)/../fixture.yaml"

${OPA}/policies | jq -e '.result|any(.id=="default/policy-include/include.rego")==true'
${OPA}/data/example/include/allow | jq -e '.result|true'
${OPA}/data/example/include/allow | jq -e '.result==true'

${OPA}/data/default | jq -e '.result|keys==["data-include"]'
${OPA}/data/default/data-include | jq -e '.result["include.json"].inKey=="inValue"'

kubectl get cm -l openpolicyagent.org/policy=rego -ojson | \
jq -e '.items[].metadata.annotations["openpolicyagent.org/kube-mgmt-status"]|fromjson|.status=="ok"'

kubectl get cm -l openpolicyagent.org/data=opa -ojson | \
jq -e '.items[].metadata.annotations["openpolicyagent.org/kube-mgmt-status"]|fromjson|.status=="ok"'
4 changes: 2 additions & 2 deletions test/e2e/fixture-labels.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
include.rego: |
package example.include
allow := true
---
---
kind: ConfigMap
metadata:
name: policy-exclude
Expand All @@ -33,7 +33,7 @@ apiVersion: v1
data:
include.json: |
{"inKey": "inValue"}
---
---
kind: ConfigMap
metadata:
name: data-exclude
Expand Down
16 changes: 16 additions & 0 deletions test/e2e/fixture-multi-fail.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
kind: ConfigMap
metadata:
name: multi-file-fail-policy
labels:
kube-mgmt/e2e: "true"
openpolicyagent.org/policy: rego
apiVersion: v1
data:
f.rego: |
package my_pkg_fail
import data.my_pkg_fail.functions.my_func
default my_rule := false
my_rule {
my_func(input.hello)
}
19 changes: 19 additions & 0 deletions test/e2e/fixture-multi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
kind: ConfigMap
metadata:
name: multi-file-policy
labels:
kube-mgmt/e2e: "true"
openpolicyagent.org/policy: rego
apiVersion: v1
data:
a.rego: |
package my_pkg
import data.my_pkg.functions.my_func
default my_rule := false
my_rule {
my_func(input.hello)
}
b.rego: |
package my_pkg.functions
my_func(str) := startswith("world", str)
4 changes: 2 additions & 2 deletions test/e2e/fixture.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
include.rego: |
package example.include
allow := true
---
---
kind: ConfigMap
metadata:
name: policy-exclude
Expand All @@ -33,7 +33,7 @@ apiVersion: v1
data:
include.json: |
{"inKey": "inValue"}
---
---
kind: ConfigMap
metadata:
name: data-exclude
Expand Down
10 changes: 8 additions & 2 deletions test/e2e/labels/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@
set -e
set -x

OPA="http :8080/v1"
OPA="http --ignore-stdin :8080/v1"

${OPA}/data | jq -e '.result.default//{}|keys|length==0'

kubectl apply -f "$(dirname $0)/../fixture-labels.yaml"

${OPA}/policies | jq -e '.result[].id=="default/policy-include/include.rego"'
${OPA}/data/example/include/allow | jq -e '.result|true'
${OPA}/data/example/include/allow | jq -e '.result==true'

${OPA}/data/default | jq -e '.result|keys==["data-include"]'
${OPA}/data/default/data-include | jq -e '.result["include.json"].inKey=="inValue"'

kubectl get cm -l qweqwe/policy=111 -ojson | \
jq -e '.items[].metadata.annotations["openpolicyagent.org/kube-mgmt-status"]|fromjson|.status=="ok"'

kubectl get cm -l asdasd/data=222 -ojson | \
jq -e '.items[].metadata.annotations["openpolicyagent.org/kube-mgmt-status"]|fromjson|.status=="ok"'
Loading