diff --git a/controlplane/src/gateway_controller.rs b/controlplane/src/gateway_controller.rs index bf77704b..fda64cf3 100644 --- a/controlplane/src/gateway_controller.rs +++ b/controlplane/src/gateway_controller.rs @@ -15,11 +15,13 @@ limitations under the License. */ use futures::StreamExt; +use log::debug; use std::{ ops::Sub, sync::Arc, time::{Duration, Instant}, }; +use utils::set_condition; use crate::*; use gateway_api::apis::standard::gateways::{Gateway, GatewayStatus}; @@ -74,6 +76,14 @@ pub async fn reconcile(gateway: Arc, ctx: Arc) -> Result, ctx: Arc) -> Result, ctx: Arc) -> Result { + let start = Instant::now(); + let client = ctx.client.clone(); + let name = gateway_class + .metadata + .name + .clone() + .ok_or(Error::InvalidConfigError("no name provided for gatewayclass".to_string()))?; + + let mut gwc = GatewayClass { + metadata: gateway_class.metadata.clone(), + spec: gateway_class.spec.clone(), + status: gateway_class.status.clone(), + // NOTE: Am I missing anything else here? + }; + + if gateway_class.spec.controller_name != GATEWAY_CLASS_CONTROLLER_NAME { + // Skip reconciling because we don't manage this resource + return Ok(Action::await_change()); + } + + if !is_accepted(&gateway_class) { + info!("marking gateway class {:?} as accepted", name); + accept(&mut gwc); + let gatewayclass_api = Api::::all(client); + patch_status(&gatewayclass_api, name, &gwc.status.unwrap_or_default()).await?; + } + + let duration = Instant::now().sub(start); + info!("finished reconciling in {:?} ms", duration.as_millis()); + Ok(Action::await_change()) +} + +pub async fn controller(ctx: Context) -> Result<()> { + let gwc_api = Api::::all(ctx.client.clone()); + gwc_api + .list(&ListParams::default().limit(1)) + .await + .map_err(Error::CRDNotFoundError)?; + + Controller::new(gwc_api, Config::default().any_semantic()) + .shutdown_on_signal() + .run(reconcile, error_policy, Arc::new(ctx)) + .filter_map(|x| async move { std::result::Result::ok(x) }) + .for_each(|_| futures::future::ready(())) + .await; + + Ok(()) +} + +fn error_policy(_: Arc, error: &Error, _: Arc) -> Action { + warn!("reconcile failed: {:?}", error); + Action::requeue(Duration::from_secs(5)) +} diff --git a/controlplane/src/gatewayclass_utils.rs b/controlplane/src/gatewayclass_utils.rs new file mode 100644 index 00000000..ec2a771b --- /dev/null +++ b/controlplane/src/gatewayclass_utils.rs @@ -0,0 +1,77 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +use crate::*; +use gateway_api::apis::standard::{ + constants::{GatewayConditionReason, GatewayConditionType}, + gatewayclasses::{GatewayClass, GatewayClassStatus}, +}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; + +use chrono::Utc; +use kube::api::{Api, Patch, PatchParams}; +use serde_json::json; +use utils::set_condition; + +pub fn is_accepted(gateway_class: &GatewayClass) -> bool { + let mut accepted = false; + if let Some(status) = &gateway_class.status { + if let Some(conditions) = &status.conditions { + for condition in conditions { + accepted = condition.type_ == GatewayConditionType::Accepted.to_string() + && condition.status == "True" + } + } + } + accepted +} + +pub fn accept(gateway_class: &mut GatewayClass) { + let now = metav1::Time(Utc::now()); + let accepted = metav1::Condition { + type_: GatewayConditionType::Accepted.to_string(), + status: String::from("True"), + reason: GatewayConditionReason::Accepted.to_string(), + observed_generation: gateway_class.metadata.generation, + last_transition_time: now, + message: String::from("Blixt accepts responsibility for this GatewayClass"), + }; + set_condition(gateway_class, accepted); +} + +pub async fn patch_status( + gatewayclass_api: &Api, + name: String, + status: &GatewayClassStatus, +) -> Result<()> { + let mut conditions = &vec![]; + if let Some(c) = status.conditions.as_ref() { + conditions = c; + } + let patch = Patch::Apply(json!({ + "apiVersion": "gateway.networking.k8s.io/v1", + "kind": "GatewayClass", + "status": { + "conditions": conditions + } + })); + let params = PatchParams::apply(BLIXT_FIELD_MANAGER).force(); + gatewayclass_api + .patch_status(name.as_str(), ¶ms, &patch) + .await + .map_err(Error::KubeError)?; + Ok(()) +} diff --git a/controlplane/src/lib.rs b/controlplane/src/lib.rs index 63f8d6a2..4eff774d 100644 --- a/controlplane/src/lib.rs +++ b/controlplane/src/lib.rs @@ -19,6 +19,10 @@ use thiserror::Error; pub mod gateway_controller; pub mod gateway_utils; +pub mod gatewayclass_controller; +pub mod gatewayclass_utils; +mod traits; +pub mod utils; // Context for our reconciler #[derive(Clone)] diff --git a/controlplane/src/traits.rs b/controlplane/src/traits.rs new file mode 100644 index 00000000..d3f6f728 --- /dev/null +++ b/controlplane/src/traits.rs @@ -0,0 +1,18 @@ +use gateway_api::apis::standard::{gatewayclasses::GatewayClass, gateways::Gateway}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; + +pub trait HasConditions { + fn get_conditions_mut(&mut self) -> &mut Option>; +} + +impl HasConditions for Gateway { + fn get_conditions_mut(&mut self) -> &mut Option> { + &mut self.status.as_mut().unwrap().conditions + } +} + +impl HasConditions for GatewayClass { + fn get_conditions_mut(&mut self) -> &mut Option> { + &mut self.status.as_mut().unwrap().conditions + } +} diff --git a/controlplane/src/utils.rs b/controlplane/src/utils.rs new file mode 100644 index 00000000..8570b5a2 --- /dev/null +++ b/controlplane/src/utils.rs @@ -0,0 +1,27 @@ +use crate::traits::HasConditions; +use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; + +// Sets the provided condition on any Gateway API object so log as it implements +// the HasConditions trait. +// +// The condition on the object is only updated +// if the new condition has a different status (except for the observed generation which is always +// updated). +pub fn set_condition(obj: &mut T, new_cond: metav1::Condition) { + if let Some(ref mut conditions) = obj.get_conditions_mut() { + for condition in conditions.iter_mut() { + if condition.type_ == new_cond.type_ { + if condition.status == new_cond.status { + // always update the observed generation + condition.observed_generation = new_cond.observed_generation; + return; + } + *condition = new_cond; + return; + } + } + conditions.push(new_cond); + } else { + obj.get_conditions_mut().replace(vec![new_cond]); + } +}