Skip to content

Commit

Permalink
feat(meta): improve param updating process when the cluster version u…
Browse files Browse the repository at this point in the history
…pgrades (#8260)

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
Gun9niR and mergify[bot] authored Mar 3, 2023
1 parent d85c631 commit 870cbf7
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 59 deletions.
155 changes: 108 additions & 47 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
pub mod local_manager;
pub mod reader;

use std::collections::HashSet;
use std::fmt::Debug;
use std::ops::RangeBounds;

Expand All @@ -26,10 +25,13 @@ pub type SystemParamsError = String;

type Result<T> = core::result::Result<T, SystemParamsError>;

// Only includes undeprecated params.
// Macro input is { field identifier, type, default value }
/// Only includes undeprecated params.
/// Macro input is { field identifier, type, default value }
#[macro_export]
macro_rules! for_all_undeprecated_params {
($macro:ident) => {
($macro:ident
// Hack: match trailing fields to implement `for_all_params`
$(, { $field:ident, $type:ty, $default:expr })*) => {
$macro! {
{ barrier_interval_ms, u32, 1000_u32 },
{ checkpoint_frequency, u64, 10_u64 },
Expand All @@ -40,26 +42,31 @@ macro_rules! for_all_undeprecated_params {
{ data_directory, String, "hummock_001".to_string() },
{ backup_storage_url, String, "memory".to_string() },
{ backup_storage_directory, String, "backup".to_string() },
$({ $field, $type, $default },)*
}
};
}

// Only includes deprecated params. Used to define key constants.
// Macro input is { field identifier, type, default value }
macro_rules! for_all_deprecated_params {
/// Includes all params.
/// Macro input is { field identifier, type, default value }
macro_rules! for_all_params {
($macro:ident) => {
$macro! {}
for_all_undeprecated_params!(
$macro /* Define future deprecated params here, such as
* ,{ backup_storage_directory, String, "backup".to_string() } */
);
};
}

/// Convert field name to string.
#[macro_export]
macro_rules! key_of {
($field:ident) => {
stringify!($field)
};
}

// Define key constants for fields in `SystemParams` for use of other modules.
/// Define key constants for fields in `SystemParams` for use of other modules.
macro_rules! def_key {
($({ $field:ident, $type:ty, $default:expr },)*) => {
paste! {
Expand All @@ -70,13 +77,27 @@ macro_rules! def_key {
};
}

for_all_undeprecated_params!(def_key);
for_all_deprecated_params!(def_key);
for_all_params!(def_key);

/// Define default value functions.
macro_rules! def_default {
($({ $field:ident, $type:ty, $default:expr },)*) => {
pub mod default {
$(
pub fn $field() -> $type {
$default
}
)*
}
};
}

for_all_undeprecated_params!(def_default);

// Derive serialization to kv pairs.
/// Derive serialization to kv pairs.
macro_rules! impl_system_params_to_kv {
($({ $field:ident, $type:ty, $default:expr },)*) => {
/// All undeprecated fields are guaranteed to be contained in the returned map.
/// The returned map only contains undeprecated fields.
/// Return error if there are missing fields.
pub fn system_params_to_kv(params: &SystemParams) -> Result<Vec<(String, String)>> {
let mut ret = Vec::with_capacity(9);
Expand All @@ -95,52 +116,57 @@ macro_rules! impl_system_params_to_kv {
};
}

// Derive deserialization from kv pairs.
macro_rules! impl_derive_missing_fields {
($({ $field:ident, $type:ty, $default:expr },)*) => {
fn derive_missing_fields(params: &mut SystemParams) {
$(
if params.$field.is_none() && let Some(v) = OverrideFromParams::$field(params) {
params.$field = Some(v);
}
)*
}
};
}

/// Derive deserialization from kv pairs.
macro_rules! impl_system_params_from_kv {
($({ $field:ident, $type:ty, $default:expr },)*) => {
/// For each field in `SystemParams`, one of these rules apply:
/// - Up-to-date: Guaranteed to be `Some`. If it is not present, may try to derive it from previous
/// versions of this field.
/// - Deprecated: Guaranteed to be `None`.
/// - Unrecognized: Not allowed.
pub fn system_params_from_kv(kvs: Vec<(impl AsRef<[u8]>, impl AsRef<[u8]>)>) -> Result<SystemParams> {
/// Try to deserialize deprecated fields as well.
/// Return error if there are unrecognized fields.
pub fn system_params_from_kv<K, V>(mut kvs: Vec<(K, V)>) -> Result<SystemParams>
where
K: AsRef<[u8]> + Debug,
V: AsRef<[u8]> + Debug,
{
let mut ret = SystemParams::default();
let mut expected_keys: HashSet<_> = [
$(key_of!($field),)*
]
.iter()
.cloned()
.collect();
for (k, v) in kvs {
kvs.retain(|(k,v)| {
let k = std::str::from_utf8(k.as_ref()).unwrap();
let v = std::str::from_utf8(v.as_ref()).unwrap();
match k {
$(
key_of!($field) => ret.$field = Some(v.parse().unwrap()),
key_of!($field) => {
ret.$field = Some(v.parse().unwrap());
false
}
)*
_ => {
return Err(format!(
"unrecognized system param {:?}",
k
));
true
}
}
expected_keys.remove(k);
}
if !expected_keys.is_empty() {
return Err(format!(
"missing system param {:?}",
expected_keys
));
});
derive_missing_fields(&mut ret);
if !kvs.is_empty() {
Err(format!("unrecognized system params {:?}", kvs))
} else {
Ok(ret)
}
Ok(ret)
}
};
}

// Define check rules when a field is changed. By default all fields are immutable.
// If you want custom rules, please override the default implementation in
// `OverrideValidateOnSet` below.
/// Define check rules when a field is changed. By default all fields are immutable.
/// If you want custom rules, please override the default implementation in
/// `OverrideValidateOnSet` below.
macro_rules! impl_default_validation_on_set {
($({ $field:ident, $type:ty, $default:expr },)*) => {
#[allow(clippy::ptr_arg)]
Expand Down Expand Up @@ -170,6 +196,38 @@ macro_rules! impl_default_validation_on_set {
}
}

/// Define rules to derive a parameter from others. This is useful for parameter type change or
/// semantic change, where a new parameter has to be introduced. When the cluster upgrades to a
/// newer version, we need to ensure the effect of the new parameter is equal to its older versions.
/// For example, if you had `interval_sec` and now you want finer granularity, you can introduce a
/// new param `interval_ms` and try to derive it from `interval_sec` by overriding `FromParams`
/// trait in `OverrideFromParams`:
///
/// ```ignore
/// impl FromParams for OverrideFromParams {
/// fn interval_ms(params: &SystemParams) -> Option<u64> {
/// if let Some(sec) = params.interval_sec {
/// Some(sec * 1000)
/// } else {
/// None
/// }
/// }
/// }
/// ```
///
/// Note that newer versions must be prioritized during derivation.
macro_rules! impl_default_from_other_params {
($({ $field:ident, $type:ty, $default:expr },)*) => {
trait FromParams {
$(
fn $field(_params: &SystemParams) -> Option<$type> {
None
}
)*
}
};
}

macro_rules! impl_set_system_param {
($({ $field:ident, $type:ty, $default:expr },)*) => {
pub fn set_system_param(params: &mut SystemParams, key: &str, value: Option<String>) -> Result<()> {
Expand Down Expand Up @@ -211,7 +269,8 @@ macro_rules! impl_default_system_params {
};
}

for_all_undeprecated_params!(impl_system_params_from_kv);
for_all_undeprecated_params!(impl_derive_missing_fields);
for_all_params!(impl_system_params_from_kv);
for_all_undeprecated_params!(impl_system_params_to_kv);
for_all_undeprecated_params!(impl_set_system_param);
for_all_undeprecated_params!(impl_default_validation_on_set);
Expand All @@ -224,6 +283,11 @@ impl ValidateOnSet for OverrideValidateOnSet {
}
}

for_all_undeprecated_params!(impl_default_from_other_params);

struct OverrideFromParams;
impl FromParams for OverrideFromParams {}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -247,9 +311,6 @@ mod tests {
let p = SystemParams::default();
assert!(system_params_to_kv(&p).is_err());

// From kv - missing field.
assert!(system_params_from_kv(vec![(BARRIER_INTERVAL_MS_KEY, "1")]).is_err());

// From kv - unrecognized field.
assert!(system_params_from_kv(vec![("?", "?")]).is_err());

Expand Down
43 changes: 32 additions & 11 deletions src/meta/src/manager/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
// limitations under the License.

pub mod model;

use std::ops::DerefMut;
use std::sync::Arc;

use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::system_param::set_system_param;
use risingwave_common::system_param::{default, set_system_param};
use risingwave_common::{for_all_undeprecated_params, key_of};
use risingwave_pb::meta::SystemParams;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -45,13 +47,13 @@ impl<S: MetaStore> SystemParamManager<S> {
let persisted = SystemParams::get(meta_store.as_ref()).await?;

let params = if let Some(persisted) = persisted {
Self::validate_init_params(&persisted, &init_params);
persisted
merge_params(persisted, init_params)
} else {
SystemParams::insert(&init_params, meta_store.as_ref()).await?;
init_params
};

SystemParams::insert(&params, meta_store.as_ref()).await?;

Ok(Self {
meta_store,
notification_manager,
Expand Down Expand Up @@ -89,13 +91,32 @@ impl<S: MetaStore> SystemParamManager<S> {

Ok(())
}
}

fn validate_init_params(persisted: &SystemParams, init: &SystemParams) {
// Only compare params from CLI and config file.
// TODO: Currently all fields are from CLI/config, but after CLI becomes the only source of
// `init`, should only compare them
if persisted != init {
tracing::warn!("System parameters from CLI and config file differ from the persisted")
// For each field in `persisted` and `init`
// 1. Some, None: Params not from CLI need not be validated. Use persisted value.
// 2. Some, Some: Check equality and warn if they differ.
// 3. None, Some: A new version of RW cluster is launched for the first time and newly introduced
// params are not set. Use init value.
// 4. None, None: Same as 3, but the init param is not from CLI. Use default value.
macro_rules! impl_merge_params {
($({ $field:ident, $type:ty, $default:expr },)*) => {
fn merge_params(mut persisted: SystemParams, init: SystemParams) -> SystemParams {
$(
match (persisted.$field.as_ref(), init.$field) {
(Some(persisted), Some(init)) => {
if persisted != &init {
tracing::warn!("System parameters \"{:?}\" from CLI and config file ({}) differ from persisted ({})", key_of!($field), init, persisted);
}
},
(None, Some(init)) => persisted.$field = Some(init),
(None, None) => { persisted.$field = Some(default::$field()) },
_ => {},
}
)*
persisted
}
}
};
}

for_all_undeprecated_params!(impl_merge_params);
1 change: 0 additions & 1 deletion src/meta/src/manager/system_param/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ impl SystemParamsModel for SystemParams {
SYSTEM_PARAMS_CF_NAME.to_string()
}

/// All undeprecated fields are guaranteed to be `Some`.
/// Return error if there are missing or unrecognized fields.
async fn get<S>(store: &S) -> MetadataModelResult<Option<Self>>
where
Expand Down

0 comments on commit 870cbf7

Please sign in to comment.