Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement gateway class controller in Rust #360

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions controlplane/src/gateway_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ limitations under the License.
*/

use futures::StreamExt;
use log::debug;
use std::{
ops::Sub,
sync::Arc,
time::{Duration, Instant},
};
use utils::set_condition;

use crate::*;
use gateway_api::apis::standard::gateways::{Gateway, GatewayStatus};
Expand Down Expand Up @@ -74,6 +76,14 @@ pub async fn reconcile(gateway: Arc<Gateway>, ctx: Arc<Context>) -> Result<Actio
"found a supported GatewayClass: {:?}",
gateway_class.name_any()
);
// Only reconcile the Gateway object if our GatewayClass has already been accepted
EandrewJones marked this conversation as resolved.
Show resolved Hide resolved
if !gatewayclass_utils::is_accepted(&gateway_class) {
debug!(
"GatewayClass {:?} not yet accepted",
gateway_class.name_any()
);
return Ok(Action::await_change());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So right now I think this will introduce a bug because below in our controller function I don't see that we've set up any watch mechanism that will re-enqueue the Gateway once the GatewayClass is finally accepted. We would need that to make sure that GatewayClass changes trigger their connected Gateways getting enqueued 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just re-enqueue here? Wouldn't that cause the gateway to keep re-enqueuing until it's GatewayClass was accepted?

That's not ideal because it's tantamount to naively long polling for changes to your GatewayClass that may never come versus the GatewayClass change driving the behavior.

Is there a an API for the watch mechanism you describe? If so, I can implement that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just re-enqueue here? Wouldn't that cause the gateway to keep re-enqueuing until it's GatewayClass was accepted?
That's not ideal because it's tantamount to naively long polling for changes to your GatewayClass that may never come versus the GatewayClass change driving the behavior.

It still wouldn't be quite right because of the problem you described, and in a case where something is broken it would just endlessly re-enqueue.

Is there a an API for the watch mechanism you describe? If so, I can implement that.

Yes, there are ways to watch for changes to related or owned objects, and then enqueue the related object when the watch object changes. Look at the owns() (for when there's specifically an owner relationship) and watches() functions and other related functions in kube::runtime::Controller, there are examples in there as well. I think what we'll probably want to do is run a watch on GatewayClass that re-enqueues any related Gateways when that class changes whenever it matches our controllerName 🤔

}

set_listener_status(&mut gw)?;
let accepted_cond = get_accepted_condition(&gw);
Expand Down Expand Up @@ -127,11 +137,11 @@ pub async fn reconcile(gateway: Arc<Gateway>, ctx: Arc<Context>) -> Result<Actio
let updated = update_service_for_gateway(gateway.as_ref(), &mut service)?;
if updated {
info!("drift detected; updating loadbalancer service");
let patch_parmas = PatchParams::default();
let patch_params = PatchParams::default();
shaneutt marked this conversation as resolved.
Show resolved Hide resolved
service_api
.patch(
val.name_any().as_str(),
&patch_parmas,
&patch_params,
&Patch::Strategic(&service),
)
.await
Expand Down
24 changes: 0 additions & 24 deletions controlplane/src/gateway_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,30 +314,6 @@ pub async fn patch_status(
Ok(())
}

// Sets the provided condition on the Gateway object. The condition on the Gateway is only updated
// if the new condition has a different status (except for the observed generation which is always
// updated).
pub fn set_condition(gateway: &mut Gateway, new_cond: metav1::Condition) {
if let Some(ref mut status) = gateway.status {
if let Some(ref mut conditions) = status.conditions {
for condition in conditions.iter_mut() {
if condition.type_ == new_cond.type_ {
if condition.status == new_cond.status {
// always update the observed generation
condition.observed_generation = new_cond.observed_generation;
return;
}
*condition = new_cond;
return;
}
}
conditions.push(new_cond);
} else {
status.conditions = Some(vec![new_cond]);
}
}
}

// Inspects the provided Gateway and returns a Condition of type "Accepted" with appropriate reason and status.
// Ideally, this should be called after the Gateway object reflects the latest status of its
// listeners.
Expand Down
90 changes: 90 additions & 0 deletions controlplane/src/gatewayclass_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

use futures::StreamExt;
use std::{
ops::Sub,
sync::Arc,
time::{Duration, Instant},
};

use crate::*;
use gateway_api::apis::standard::gatewayclasses::GatewayClass;
use kube::{
api::{Api, ListParams},
runtime::{controller::Action, watcher::Config, Controller},
};

use gatewayclass_utils::*;
use tracing::*;

pub async fn reconcile(gateway_class: Arc<GatewayClass>, ctx: Arc<Context>) -> Result<Action> {
let start = Instant::now();
let client = ctx.client.clone();
let name = gateway_class
.metadata
.name
.clone()
.ok_or(Error::InvalidConfigError("invalid name".to_string()))?;
EandrewJones marked this conversation as resolved.
Show resolved Hide resolved

let gatewayclass_api = Api::<GatewayClass>::all(client);
EandrewJones marked this conversation as resolved.
Show resolved Hide resolved
let mut gwc = GatewayClass {
metadata: gateway_class.metadata.clone(),
spec: gateway_class.spec.clone(),
status: gateway_class.status.clone(),
// NOTE: Am I missing anything else here?
};
Comment on lines +43 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the goal is to make a full clone, you should be able to do that with Deref and clone:

Suggested change
let mut gwc = GatewayClass {
metadata: gateway_class.metadata.clone(),
spec: gateway_class.spec.clone(),
status: gateway_class.status.clone(),
// NOTE: Am I missing anything else here?
};
let mut gwc = (*gateway_class).clone();

Copy link
Contributor Author

@EandrewJones EandrewJones Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed the aim, unless I can get away with not cloning at all. But we ultimately would need a thread-safe mutable reference (Arc<Mutex<GatewayClass>>) to our gateway_class input then, but AFAICT from the kube.rs docs, reconcile will always be passed an Arc<> of the resource it's watching. Is that so?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will always be an Arc, so if you want the inner of an arc you can do a deref clone (as shown above) OR an as_ref().clone().


if gateway_class.spec.controller_name != GATEWAY_CLASS_CONTROLLER_NAME {
// Skip reconciling because we don't manage this resource
// NOTE: May want to requeue in case this resource becomes relevant again in the
// future (e.g. the controllerName is changed to match ours in the event of typo,
// etc.)
return Ok(Action::requeue(Duration::from_secs(3600 / 2)));
EandrewJones marked this conversation as resolved.
Show resolved Hide resolved
}

if !is_accepted(&gateway_class) {
info!("marking gateway class {:?} as accepted", name);
accept(&mut gwc);
patch_status(&gatewayclass_api, name, &gwc.status.unwrap_or_default()).await?;
EandrewJones marked this conversation as resolved.
Show resolved Hide resolved
}

let duration = Instant::now().sub(start);
info!("finished reconciling in {:?} ms", duration.as_millis());
Ok(Action::await_change())
}

pub async fn controller(ctx: Context) -> Result<()> {
let gwc_api = Api::<GatewayClass>::all(ctx.client.clone());
gwc_api
.list(&ListParams::default().limit(1))
.await
.map_err(Error::CRDNotFoundError)?;

Controller::new(gwc_api, Config::default().any_semantic())
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(ctx))
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await;

Ok(())
}

fn error_policy(_: Arc<GatewayClass>, error: &Error, _: Arc<Context>) -> Action {
warn!("reconcile failed: {:?}", error);
Action::requeue(Duration::from_secs(5))
}
77 changes: 77 additions & 0 deletions controlplane/src/gatewayclass_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

use crate::*;
use gateway_api::apis::standard::{
constants::{GatewayConditionReason, GatewayConditionType},
gatewayclasses::{GatewayClass, GatewayClassStatus},
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1;

use chrono::Utc;
use kube::api::{Api, Patch, PatchParams};
use serde_json::json;
use utils::set_condition;

pub fn is_accepted(gateway_class: &GatewayClass) -> bool {
let mut accepted = false;
if let Some(status) = &gateway_class.status {
if let Some(conditions) = &status.conditions {
for condition in conditions {
accepted = condition.type_ == GatewayConditionType::Accepted.to_string()
&& condition.status == "True"
}
}
}
accepted
}

pub fn accept(gateway_class: &mut GatewayClass) {
let now = metav1::Time(Utc::now());
let accepted = metav1::Condition {
type_: GatewayConditionType::Accepted.to_string(),
status: String::from("True"),
reason: GatewayConditionReason::Accepted.to_string(),
observed_generation: gateway_class.metadata.generation,
last_transition_time: now,
message: String::from("Blixt accepts responsibility for this GatewayClass"),
};
set_condition(gateway_class, accepted);
}

pub async fn patch_status(
gatewayclass_api: &Api<GatewayClass>,
name: String,
status: &GatewayClassStatus,
) -> Result<()> {
let mut conditions = &vec![];
if let Some(c) = status.conditions.as_ref() {
conditions = c;
}
let patch = Patch::Apply(json!({
"apiVersion": "gateway.networking.k8s.io/v1",
"kind": "GatewayClass",
"status": {
"conditions": conditions
}
}));
let params = PatchParams::apply(BLIXT_FIELD_MANAGER).force();
gatewayclass_api
.patch_status(name.as_str(), &params, &patch)
.await
.map_err(Error::KubeError)?;
Ok(())
}
4 changes: 4 additions & 0 deletions controlplane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use thiserror::Error;

pub mod gateway_controller;
pub mod gateway_utils;
pub mod gatewayclass_controller;
pub mod gatewayclass_utils;
mod traits;
pub mod utils;

// Context for our reconciler
#[derive(Clone)]
Expand Down
18 changes: 18 additions & 0 deletions controlplane/src/traits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use gateway_api::apis::standard::{gatewayclasses::GatewayClass, gateways::Gateway};
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1;

pub trait HasConditions {
fn get_conditions_mut(&mut self) -> &mut Option<Vec<metav1::Condition>>;
}

impl HasConditions for Gateway {
fn get_conditions_mut(&mut self) -> &mut Option<Vec<metav1::Condition>> {
&mut self.status.as_mut().unwrap().conditions
}
}

impl HasConditions for GatewayClass {
fn get_conditions_mut(&mut self) -> &mut Option<Vec<metav1::Condition>> {
&mut self.status.as_mut().unwrap().conditions
}
}
27 changes: 27 additions & 0 deletions controlplane/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::traits::HasConditions;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1;

// Sets the provided condition on any Gateway API object so log as it implements
// the HasConditions trait.
//
// The condition on the object is only updated
// if the new condition has a different status (except for the observed generation which is always
// updated).
pub fn set_condition<T: HasConditions>(obj: &mut T, new_cond: metav1::Condition) {
if let Some(ref mut conditions) = obj.get_conditions_mut() {
for condition in conditions.iter_mut() {
if condition.type_ == new_cond.type_ {
if condition.status == new_cond.status {
// always update the observed generation
condition.observed_generation = new_cond.observed_generation;
return;
}
*condition = new_cond;
return;
}
}
conditions.push(new_cond);
} else {
obj.get_conditions_mut().replace(vec![new_cond]);
}
}
Loading