Skip to content

Commit

Permalink
Various updates to the revision reconciler
Browse files Browse the repository at this point in the history
1. PA Reachability now depends on the status of the Deployment

If we have available replicas we don't mark the revision as
unreachable. This allows ongoing requests to be handled

2. Always propagate the K8s Deployment Status to the Revision.

We don't need to gate this depending on whether the Revision
required activation. Since the only two conditions we propagate
from the Deployment is Progressing and ReplicaSetFailure=False

3. Mark Revision as Deploying if the PA's service name isn't set
  • Loading branch information
dprotaso committed Jan 26, 2024
1 parent 19e5492 commit 23247c1
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 99 deletions.
1 change: 1 addition & 0 deletions pkg/apis/serving/k8s_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TransformDeploymentStatus(ds *appsv1.DeploymentStatus) *duckv1.Status {
// The absence of this condition means no failure has occurred. If we find it
// below, we'll overwrite this.
depCondSet.Manage(s).MarkTrue(DeploymentConditionReplicaSetReady)
depCondSet.Manage(s).MarkUnknown(DeploymentConditionProgressing, "Deploying", "")

conds := []appsv1.DeploymentConditionType{
appsv1.DeploymentProgressing,
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/serving/k8s_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ func TestTransformDeploymentStatus(t *testing.T) {
Conditions: []apis.Condition{{
Type: DeploymentConditionProgressing,
Status: corev1.ConditionUnknown,
Reason: "Deploying",
}, {
Type: DeploymentConditionReplicaSetReady,
Status: corev1.ConditionTrue,
}, {
Type: DeploymentConditionReady,
Status: corev1.ConditionUnknown,
Reason: "Deploying",
}},
},
}, {
Expand Down
7 changes: 0 additions & 7 deletions pkg/apis/serving/v1/revision_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package v1
import (
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
net "knative.dev/networking/pkg/apis/networking"
"knative.dev/pkg/kmeta"
Expand Down Expand Up @@ -144,9 +143,3 @@ func (rs *RevisionStatus) IsActivationRequired() bool {
c := revisionCondSet.Manage(rs).GetCondition(RevisionConditionActive)
return c != nil && c.Status != corev1.ConditionTrue
}

// IsReplicaSetFailure returns true if the deployment replicaset failed to create
func (rs *RevisionStatus) IsReplicaSetFailure(deploymentStatus *appsv1.DeploymentStatus) bool {
ds := serving.TransformDeploymentStatus(deploymentStatus)
return ds != nil && ds.GetCondition(serving.DeploymentConditionReplicaSetReady).IsFalse()
}
43 changes: 0 additions & 43 deletions pkg/apis/serving/v1/revision_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"

Expand Down Expand Up @@ -268,45 +267,3 @@ func TestSetRoutingState(t *testing.T) {
t.Error("Expected default value for unparsable annotationm but got:", got)
}
}

func TestIsReplicaSetFailure(t *testing.T) {
revisionStatus := RevisionStatus{}
cases := []struct {
name string
status appsv1.DeploymentStatus
IsReplicaSetFailure bool
}{{
name: "empty deployment status should not be a failure",
status: appsv1.DeploymentStatus{},
}, {
name: "Ready deployment status should not be a failure",
status: appsv1.DeploymentStatus{
Conditions: []appsv1.DeploymentCondition{{
Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue,
}},
},
}, {
name: "ReplicasetFailure true should be a failure",
status: appsv1.DeploymentStatus{
Conditions: []appsv1.DeploymentCondition{{
Type: appsv1.DeploymentReplicaFailure, Status: corev1.ConditionTrue,
}},
},
IsReplicaSetFailure: true,
}, {
name: "ReplicasetFailure false should not be a failure",
status: appsv1.DeploymentStatus{
Conditions: []appsv1.DeploymentCondition{{
Type: appsv1.DeploymentReplicaFailure, Status: corev1.ConditionFalse,
}},
},
}}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got, want := revisionStatus.IsReplicaSetFailure(&tc.status), tc.IsReplicaSetFailure; got != want {
t.Errorf("IsReplicaSetFailure = %v, want: %v", got, want)
}
})
}
}
15 changes: 13 additions & 2 deletions pkg/apis/serving/v1/revision_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ func (rs *RevisionStatus) PropagateDeploymentStatus(original *appsv1.DeploymentS

// PropagateAutoscalerStatus propagates autoscaler's status to the revision's status.
func (rs *RevisionStatus) PropagateAutoscalerStatus(ps *autoscalingv1alpha1.PodAutoscalerStatus) {
resUnavailable := rs.GetCondition(RevisionConditionResourcesAvailable).IsFalse()

// Reflect the PA status in our own.
cond := ps.GetCondition(autoscalingv1alpha1.PodAutoscalerConditionReady)
rs.ActualReplicas = nil
Expand All @@ -183,20 +185,29 @@ func (rs *RevisionStatus) PropagateAutoscalerStatus(ps *autoscalingv1alpha1.PodA
}

if cond == nil {
rs.MarkActiveUnknown("Deploying", "")
rs.MarkActiveUnknown(ReasonDeploying, "")

if !resUnavailable {
rs.MarkResourcesAvailableUnknown(ReasonDeploying, "")
}
return
}

// Don't mark the resources available, if deployment status already determined
// it isn't so.
resUnavailable := rs.GetCondition(RevisionConditionResourcesAvailable).IsFalse()
if ps.IsScaleTargetInitialized() && !resUnavailable {
// Precondition for PA being initialized is SKS being active and
// that implies that |service.endpoints| > 0.
rs.MarkResourcesAvailableTrue()
rs.MarkContainerHealthyTrue()
}

// Mark resource unavailable if we don't have a Service Name and the deployment is ready
// This can happen when we have initial scale set to 0
if rs.GetCondition(RevisionConditionResourcesAvailable).IsTrue() && ps.ServiceName == "" {
rs.MarkResourcesAvailableUnknown(ReasonDeploying, "")
}

switch cond.Status {
case corev1.ConditionUnknown:
rs.MarkActiveUnknown(cond.Reason, cond.Message)
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/serving/v1/revision_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ func TestPropagateAutoscalerStatus(t *testing.T) {

// PodAutoscaler becomes ready, making us active.
r.PropagateAutoscalerStatus(&autoscalingv1alpha1.PodAutoscalerStatus{
ServiceName: "some-service",
Status: duckv1.Status{
Conditions: duckv1.Conditions{{
Type: autoscalingv1alpha1.PodAutoscalerConditionReady,
Expand All @@ -552,6 +553,7 @@ func TestPropagateAutoscalerStatus(t *testing.T) {

// PodAutoscaler flipping back to Unknown causes Active become ongoing immediately.
r.PropagateAutoscalerStatus(&autoscalingv1alpha1.PodAutoscalerStatus{
ServiceName: "some-service",
Status: duckv1.Status{
Conditions: duckv1.Conditions{{
Type: autoscalingv1alpha1.PodAutoscalerConditionReady,
Expand All @@ -567,6 +569,7 @@ func TestPropagateAutoscalerStatus(t *testing.T) {

// PodAutoscaler becoming unready makes Active false, but doesn't affect readiness.
r.PropagateAutoscalerStatus(&autoscalingv1alpha1.PodAutoscalerStatus{
ServiceName: "some-service",
Status: duckv1.Status{
Conditions: duckv1.Conditions{{
Type: autoscalingv1alpha1.PodAutoscalerConditionReady,
Expand Down Expand Up @@ -685,6 +688,7 @@ func TestPropagateAutoscalerStatusRace(t *testing.T) {

// The PodAutoscaler might have been ready but it's scaled down already.
r.PropagateAutoscalerStatus(&autoscalingv1alpha1.PodAutoscalerStatus{
ServiceName: "some-service",
Status: duckv1.Status{
Conditions: duckv1.Conditions{{
Type: autoscalingv1alpha1.PodAutoscalerConditionReady,
Expand Down
16 changes: 8 additions & 8 deletions pkg/reconciler/autoscaling/kpa/kpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func markScaleTargetInitialized(pa *autoscalingv1alpha1.PodAutoscaler) {

func kpa(ns, n string, opts ...PodAutoscalerOption) *autoscalingv1alpha1.PodAutoscaler {
rev := newTestRevision(ns, n)
kpa := revisionresources.MakePA(rev)
kpa := revisionresources.MakePA(rev, nil)
kpa.Generation = 1
kpa.Annotations[autoscaling.ClassAnnotationKey] = "kpa.autoscaling.knative.dev"
kpa.Annotations[autoscaling.MetricAnnotationKey] = "concurrency"
Expand Down Expand Up @@ -1303,7 +1303,7 @@ func TestGlobalResyncOnUpdateAutoscalerConfigMap(t *testing.T) {
rev := newTestRevision(testNamespace, testRevision)
newDeployment(ctx, t, fakedynamicclient.Get(ctx), testRevision+"-deployment", 3)

kpa := revisionresources.MakePA(rev)
kpa := revisionresources.MakePA(rev, nil)
sks := aresources.MakeSKS(kpa, nv1a1.SKSOperationModeServe, minActivators)
sks.Status.PrivateServiceName = "bogus"
sks.Status.InitializeConditions()
Expand Down Expand Up @@ -1372,7 +1372,7 @@ func TestReconcileDeciderCreatesAndDeletes(t *testing.T) {

newDeployment(ctx, t, fakedynamicclient.Get(ctx), testRevision+"-deployment", 3)

kpa := revisionresources.MakePA(rev)
kpa := revisionresources.MakePA(rev, nil)
sks := sks(testNamespace, testRevision, WithDeployRef(kpa.Spec.ScaleTargetRef.Name), WithSKSReady)
fakenetworkingclient.Get(ctx).NetworkingV1alpha1().ServerlessServices(testNamespace).Create(ctx, sks, metav1.CreateOptions{})
fakeservingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(testNamespace).Create(ctx, kpa, metav1.CreateOptions{})
Expand Down Expand Up @@ -1446,7 +1446,7 @@ func TestUpdate(t *testing.T) {
fakekubeclient.Get(ctx).CoreV1().Pods(testNamespace).Create(ctx, pod, metav1.CreateOptions{})
fakefilteredpodsinformer.Get(ctx, serving.RevisionUID).Informer().GetIndexer().Add(pod)

kpa := revisionresources.MakePA(rev)
kpa := revisionresources.MakePA(rev, nil)
kpa.SetDefaults(context.Background())
fakeservingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(testNamespace).Create(ctx, kpa, metav1.CreateOptions{})
fakepainformer.Get(ctx).Informer().GetIndexer().Add(kpa)
Expand Down Expand Up @@ -1525,7 +1525,7 @@ func TestControllerCreateError(t *testing.T) {
createErr: want,
})

kpa := revisionresources.MakePA(newTestRevision(testNamespace, testRevision))
kpa := revisionresources.MakePA(newTestRevision(testNamespace, testRevision), nil)
fakeservingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(testNamespace).Create(ctx, kpa, metav1.CreateOptions{})
fakepainformer.Get(ctx).Informer().GetIndexer().Add(kpa)

Expand Down Expand Up @@ -1568,7 +1568,7 @@ func TestControllerUpdateError(t *testing.T) {
createErr: want,
})

kpa := revisionresources.MakePA(newTestRevision(testNamespace, testRevision))
kpa := revisionresources.MakePA(newTestRevision(testNamespace, testRevision), nil)
fakeservingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(testNamespace).Create(ctx, kpa, metav1.CreateOptions{})
fakepainformer.Get(ctx).Informer().GetIndexer().Add(kpa)

Expand Down Expand Up @@ -1610,7 +1610,7 @@ func TestControllerGetError(t *testing.T) {
getErr: want,
})

kpa := revisionresources.MakePA(newTestRevision(testNamespace, testRevision))
kpa := revisionresources.MakePA(newTestRevision(testNamespace, testRevision), nil)
fakeservingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(testNamespace).Create(ctx, kpa, metav1.CreateOptions{})
fakepainformer.Get(ctx).Informer().GetIndexer().Add(kpa)

Expand Down Expand Up @@ -1649,7 +1649,7 @@ func TestScaleFailure(t *testing.T) {

// Only put the KPA in the lister, which will prompt failures scaling it.
rev := newTestRevision(testNamespace, testRevision)
kpa := revisionresources.MakePA(rev)
kpa := revisionresources.MakePA(rev, nil)
fakepainformer.Get(ctx).Informer().GetIndexer().Add(kpa)

newDeployment(ctx, t, fakedynamicclient.Get(ctx), testRevision+"-deployment", 3)
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/autoscaling/kpa/scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func TestDisableScaleToZero(t *testing.T) {

func newKPA(ctx context.Context, t *testing.T, servingClient clientset.Interface, revision *v1.Revision) *autoscalingv1alpha1.PodAutoscaler {
t.Helper()
pa := revisionresources.MakePA(revision)
pa := revisionresources.MakePA(revision, nil)
pa.Status.InitializeConditions()
_, err := servingClient.AutoscalingV1alpha1().PodAutoscalers(testNamespace).Create(ctx, pa, metav1.CreateOptions{})
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/reconciler/revision/cruds.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ func (c *Reconciler) createImageCache(ctx context.Context, rev *v1.Revision, con
return c.cachingclient.CachingV1alpha1().Images(image.Namespace).Create(ctx, image, metav1.CreateOptions{})
}

func (c *Reconciler) createPA(ctx context.Context, rev *v1.Revision) (*autoscalingv1alpha1.PodAutoscaler, error) {
pa := resources.MakePA(rev)
func (c *Reconciler) createPA(
ctx context.Context,
rev *v1.Revision,
deployment *appsv1.Deployment,
) (*autoscalingv1alpha1.PodAutoscaler, error) {
pa := resources.MakePA(rev, deployment)
return c.client.AutoscalingV1alpha1().PodAutoscalers(pa.Namespace).Create(ctx, pa, metav1.CreateOptions{})
}
42 changes: 17 additions & 25 deletions pkg/reconciler/revision/reconcile_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,42 +49,27 @@ func (c *Reconciler) reconcileDeployment(ctx context.Context, rev *v1.Revision)
// Deployment does not exist. Create it.
rev.Status.MarkResourcesAvailableUnknown(v1.ReasonDeploying, "")
rev.Status.MarkContainerHealthyUnknown(v1.ReasonDeploying, "")
deployment, err = c.createDeployment(ctx, rev)
if err != nil {
if _, err = c.createDeployment(ctx, rev); err != nil {
return fmt.Errorf("failed to create deployment %q: %w", deploymentName, err)
}
logger.Infof("Created deployment %q", deploymentName)
return nil
} else if err != nil {
return fmt.Errorf("failed to get deployment %q: %w", deploymentName, err)
} else if !metav1.IsControlledBy(deployment, rev) {
// Surface an error in the revision's status, and return an error.
rev.Status.MarkResourcesAvailableFalse(v1.ReasonNotOwned, v1.ResourceNotOwnedMessage("Deployment", deploymentName))
return fmt.Errorf("revision: %q does not own Deployment: %q", rev.Name, deploymentName)
} else {
// The deployment exists, but make sure that it has the shape that we expect.
deployment, err = c.checkAndUpdateDeployment(ctx, rev, deployment)
if err != nil {
return fmt.Errorf("failed to update deployment %q: %w", deploymentName, err)
}

// Now that we have a Deployment, determine whether there is any relevant
// status to surface in the Revision.
//
// TODO(jonjohnsonjr): Should we check Generation != ObservedGeneration?
// The autoscaler mutates the deployment pretty often, which would cause us
// to flip back and forth between Ready and Unknown every time we scale up
// or down.
if !rev.Status.IsActivationRequired() {
rev.Status.PropagateDeploymentStatus(&deployment.Status)
}
}

// If the replicaset is failing we assume its an error we have to surface
if rev.Status.IsReplicaSetFailure(&deployment.Status) {
rev.Status.PropagateDeploymentStatus(&deployment.Status)
return nil
// The deployment exists, but make sure that it has the shape that we expect.
deployment, err = c.checkAndUpdateDeployment(ctx, rev, deployment)
if err != nil {
return fmt.Errorf("failed to update deployment %q: %w", deploymentName, err)
}

rev.Status.PropagateDeploymentStatus(&deployment.Status)

// If a container keeps crashing (no active pods in the deployment although we want some)
if *deployment.Spec.Replicas > 0 && deployment.Status.AvailableReplicas == 0 {
pods, err := c.kubeclient.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(deployment.Spec.Selector)})
Expand Down Expand Up @@ -151,14 +136,21 @@ func (c *Reconciler) reconcileImageCache(ctx context.Context, rev *v1.Revision)

func (c *Reconciler) reconcilePA(ctx context.Context, rev *v1.Revision) error {
ns := rev.Namespace

deploymentName := resourcenames.Deployment(rev)
deployment, err := c.deploymentLister.Deployments(ns).Get(deploymentName)
if err != nil {
return err
}

paName := resourcenames.PA(rev)
logger := logging.FromContext(ctx)
logger.Info("Reconciling PA: ", paName)

pa, err := c.podAutoscalerLister.PodAutoscalers(ns).Get(paName)
if apierrs.IsNotFound(err) {
// PA does not exist. Create it.
pa, err = c.createPA(ctx, rev)
pa, err = c.createPA(ctx, rev, deployment)
if err != nil {
return fmt.Errorf("failed to create PA %q: %w", paName, err)
}
Expand All @@ -173,7 +165,7 @@ func (c *Reconciler) reconcilePA(ctx context.Context, rev *v1.Revision) error {

// Perhaps tha PA spec changed underneath ourselves?
// We no longer require immutability, so need to reconcile PA each time.
tmpl := resources.MakePA(rev)
tmpl := resources.MakePA(rev, deployment)
logger.Debugf("Desired PASpec: %#v", tmpl.Spec)
if !equality.Semantic.DeepEqual(tmpl.Spec, pa.Spec) {
diff, _ := kmp.SafeDiff(tmpl.Spec, pa.Spec) // Can't realistically fail on PASpec.
Expand Down
20 changes: 14 additions & 6 deletions pkg/reconciler/revision/resources/pa.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package resources

import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -28,7 +29,7 @@ import (
)

// MakePA makes a Knative Pod Autoscaler resource from a revision.
func MakePA(rev *v1.Revision) *autoscalingv1alpha1.PodAutoscaler {
func MakePA(rev *v1.Revision, deployment *appsv1.Deployment) *autoscalingv1alpha1.PodAutoscaler {
return &autoscalingv1alpha1.PodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: names.PA(rev),
Expand All @@ -45,20 +46,27 @@ func MakePA(rev *v1.Revision) *autoscalingv1alpha1.PodAutoscaler {
Name: names.Deployment(rev),
},
ProtocolType: rev.GetProtocol(),
Reachability: reachability(rev),
Reachability: reachability(rev, deployment),
},
}
}

func reachability(rev *v1.Revision) autoscalingv1alpha1.ReachabilityType {
func reachability(rev *v1.Revision, deployment *appsv1.Deployment) autoscalingv1alpha1.ReachabilityType {
// check infra failures
conds := []apis.ConditionType{
infraFailure := false
for _, cond := range []apis.ConditionType{
v1.RevisionConditionResourcesAvailable,
v1.RevisionConditionContainerHealthy,
} {
if c := rev.Status.GetCondition(cond); c != nil && c.IsFalse() {
infraFailure = true
break
}
}

for _, cond := range conds {
if c := rev.Status.GetCondition(cond); c != nil && c.IsFalse() {
if infraFailure && deployment != nil && deployment.Spec.Replicas != nil {
// If we have an infra failure and no ready replicas - then this revision is unreachable
if *deployment.Spec.Replicas > 0 && deployment.Status.ReadyReplicas == 0 {
return autoscalingv1alpha1.ReachabilityUnreachable
}
}
Expand Down
Loading

0 comments on commit 23247c1

Please sign in to comment.