Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): improve param updating process when the cluster version upgrades #8260

Merged
merged 9 commits into from
Mar 3, 2023
138 changes: 100 additions & 38 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
pub mod local_manager;
pub mod reader;

use std::collections::HashSet;
use std::fmt::Debug;
use std::ops::RangeBounds;

Expand All @@ -28,8 +27,11 @@ type Result<T> = core::result::Result<T, SystemParamsError>;

// Only includes undeprecated params.
// Macro input is { field identifier, type, default value }
#[macro_export]
macro_rules! for_all_undeprecated_params {
($macro:ident) => {
($macro:ident
// Hack: match trailing fields to implement `for_all_params`
$(, { $field:ident, $type:ty, $default:expr })*) => {
$macro! {
{ barrier_interval_ms, u32, 1000_u32 },
{ checkpoint_frequency, u64, 10_u64 },
Expand All @@ -40,19 +42,24 @@ macro_rules! for_all_undeprecated_params {
{ data_directory, String, "hummock_001".to_string() },
{ backup_storage_url, String, "memory".to_string() },
{ backup_storage_directory, String, "backup".to_string() },
$({ $field, $type, $default },)*
}
};
}

// Only includes deprecated params. Used to define key constants.
// Includes all params.
// Macro input is { field identifier, type, default value }
macro_rules! for_all_deprecated_params {
macro_rules! for_all_params {
($macro:ident) => {
$macro! {}
for_all_undeprecated_params!(
$macro /* Define future deprecated params here, such as
* ,{ backup_storage_directory, String, "backup".to_string() } */
Comment on lines +55 to +56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really works but seems a little bit strange to pass deprecated params to the macro of undeprecated_params. 🤣

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really can't think of another way to put two for_all_xxx mcros together 😢

);
};
}

/// Convert field name to string.
#[macro_export]
macro_rules! key_of {
($field:ident) => {
stringify!($field)
Expand All @@ -70,13 +77,27 @@ macro_rules! def_key {
};
}

for_all_undeprecated_params!(def_key);
for_all_deprecated_params!(def_key);
for_all_params!(def_key);

// Define default value functions.
macro_rules! def_default {
($({ $field:ident, $type:ty, $default:expr },)*) => {
pub mod default {
$(
pub fn $field() -> $type {
$default
}
)*
}
};
}

for_all_undeprecated_params!(def_default);

// Derive serialization to kv pairs.
macro_rules! impl_system_params_to_kv {
($({ $field:ident, $type:ty, $default:expr },)*) => {
/// All undeprecated fields are guaranteed to be contained in the returned map.
/// The returned map only contains undeprecated fields.
/// Return error if there are missing fields.
pub fn system_params_to_kv(params: &SystemParams) -> Result<Vec<(String, String)>> {
let mut ret = Vec::with_capacity(9);
Expand All @@ -95,45 +116,50 @@ macro_rules! impl_system_params_to_kv {
};
}

macro_rules! impl_derive_missing_fields {
($({ $field:ident, $type:ty, $default:expr },)*) => {
fn derive_missing_fields(params: &mut SystemParams) {
$(
if params.$field.is_none() && let Some(v) = OverrideFromParams::$field(params) {
params.$field = Some(v);
}
)*
}
};
}

// Derive deserialization from kv pairs.
macro_rules! impl_system_params_from_kv {
($({ $field:ident, $type:ty, $default:expr },)*) => {
/// For each field in `SystemParams`, one of these rules apply:
/// - Up-to-date: Guaranteed to be `Some`. If it is not present, may try to derive it from previous
/// versions of this field.
/// - Deprecated: Guaranteed to be `None`.
/// - Unrecognized: Not allowed.
pub fn system_params_from_kv(kvs: Vec<(impl AsRef<[u8]>, impl AsRef<[u8]>)>) -> Result<SystemParams> {
/// Try to deserialize deprecated fields as well.
/// Return error if there are unrecognized fields
pub fn system_params_from_kv<K, V>(mut kvs: Vec<(K, V)>) -> Result<SystemParams>
where
K: AsRef<[u8]> + Debug,
V: AsRef<[u8]> + Debug,
{
let mut ret = SystemParams::default();
let mut expected_keys: HashSet<_> = [
$(key_of!($field),)*
]
.iter()
.cloned()
.collect();
for (k, v) in kvs {
kvs.retain(|(k,v)| {
let k = std::str::from_utf8(k.as_ref()).unwrap();
let v = std::str::from_utf8(v.as_ref()).unwrap();
match k {
$(
key_of!($field) => ret.$field = Some(v.parse().unwrap()),
key_of!($field) => {
ret.$field = Some(v.parse().unwrap());
false
}
)*
_ => {
return Err(format!(
"unrecognized system param {:?}",
k
));
true
}
}
expected_keys.remove(k);
}
if !expected_keys.is_empty() {
return Err(format!(
"missing system param {:?}",
expected_keys
));
});
derive_missing_fields(&mut ret);
if !kvs.is_empty() {
Err(format!("unrecognized system params {:?}", kvs))
} else {
Ok(ret)
}
Ok(ret)
}
};
}
Expand Down Expand Up @@ -170,6 +196,39 @@ macro_rules! impl_default_validation_on_set {
}
}

// Define rules to derive a parameter from others. This is useful for parameter type change or
// semantic change, where a new parameter has to be introduced. When the cluster upgrades to a newer
// version, we need to ensure the effect of the new parameter is equal to its older versions.
//
// For example, if you had `interval_sec` and now you want finer granularity, you can introduce a
// new param `interval_ms` and try to derive it from `interval_sec` by overriding `FromParams` trait
// in `OverrideFromParams`:
//
// ```
// impl FromParams for OverrideFromParams {
// fn interval_ms(params: &mut SystemParams) -> Option<u64> {
// if let Some(sec) = params.interval_sec {
// Some(sec * 1000)
// } else {
// None
// }
// }
// }
// ```
//
// Note that newer versions must be prioritized during derivation.
macro_rules! impl_default_from_other_params {
($({ $field:ident, $type:ty, $default:expr },)*) => {
trait FromParams {
$(
fn $field(_params: &mut SystemParams) -> Option<$type> {
None
}
)*
}
};
}

macro_rules! impl_set_system_param {
($({ $field:ident, $type:ty, $default:expr },)*) => {
pub fn set_system_param(params: &mut SystemParams, key: &str, value: Option<String>) -> Result<()> {
Expand Down Expand Up @@ -211,7 +270,8 @@ macro_rules! impl_default_system_params {
};
}

for_all_undeprecated_params!(impl_system_params_from_kv);
for_all_undeprecated_params!(impl_derive_missing_fields);
for_all_params!(impl_system_params_from_kv);
for_all_undeprecated_params!(impl_system_params_to_kv);
for_all_undeprecated_params!(impl_set_system_param);
for_all_undeprecated_params!(impl_default_validation_on_set);
Expand All @@ -224,6 +284,11 @@ impl ValidateOnSet for OverrideValidateOnSet {
}
}

for_all_undeprecated_params!(impl_default_from_other_params);

struct OverrideFromParams;
impl FromParams for OverrideFromParams {}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -247,9 +312,6 @@ mod tests {
let p = SystemParams::default();
assert!(system_params_to_kv(&p).is_err());

// From kv - missing field.
assert!(system_params_from_kv(vec![(BARRIER_INTERVAL_MS_KEY, "1")]).is_err());

// From kv - unrecognized field.
assert!(system_params_from_kv(vec![("?", "?")]).is_err());

Expand Down
43 changes: 32 additions & 11 deletions src/meta/src/manager/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
// limitations under the License.

pub mod model;

use std::ops::DerefMut;
use std::sync::Arc;

use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::system_param::set_system_param;
use risingwave_common::system_param::{default, set_system_param};
use risingwave_common::{for_all_undeprecated_params, key_of};
use risingwave_pb::meta::SystemParams;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -45,13 +47,13 @@ impl<S: MetaStore> SystemParamManager<S> {
let persisted = SystemParams::get(meta_store.as_ref()).await?;

let params = if let Some(persisted) = persisted {
Self::validate_init_params(&persisted, &init_params);
persisted
merge_params(persisted, init_params)
} else {
SystemParams::insert(&init_params, meta_store.as_ref()).await?;
init_params
};

SystemParams::insert(&params, meta_store.as_ref()).await?;

Ok(Self {
meta_store,
notification_manager,
Expand Down Expand Up @@ -89,13 +91,32 @@ impl<S: MetaStore> SystemParamManager<S> {

Ok(())
}
}

fn validate_init_params(persisted: &SystemParams, init: &SystemParams) {
// Only compare params from CLI and config file.
// TODO: Currently all fields are from CLI/config, but after CLI becomes the only source of
// `init`, should only compare them
if persisted != init {
tracing::warn!("System parameters from CLI and config file differ from the persisted")
// For each field in `persisted` and `init`
// 1. Some, None: Params not from CLI need not be validated. Use persisted value.
// 2. Some, Some: Check equality and warn if they differ.
// 3. None, Some: A new version of RW cluster is launched for the first time and newly introduced
// params are not set. Use init value.
// 4. None, None: Same as 3, but the init param is not from CLI. Use default value.
macro_rules! impl_merge_params {
($({ $field:ident, $type:ty, $default:expr },)*) => {
fn merge_params(mut persisted: SystemParams, init: SystemParams) -> SystemParams {
$(
match (persisted.$field.as_ref(), init.$field) {
(Some(persisted), Some(init)) => {
if persisted != &init {
tracing::warn!("System parameters \"{:?}\" from CLI and config file ({}) differ from persisted ({})", key_of!($field), init, persisted);
}
},
(None, Some(init)) => persisted.$field = Some(init),
(None, None) => { persisted.$field = Some(default::$field()) },
_ => {},
}
)*
persisted
}
}
};
}

for_all_undeprecated_params!(impl_merge_params);
1 change: 0 additions & 1 deletion src/meta/src/manager/system_param/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ impl SystemParamsModel for SystemParams {
SYSTEM_PARAMS_CF_NAME.to_string()
}

/// All undeprecated fields are guaranteed to be `Some`.
/// Return error if there are missing or unrecognized fields.
async fn get<S>(store: &S) -> MetadataModelResult<Option<Self>>
where
Expand Down