Skip to content

Commit

Permalink
Added config changes server event (#343)
Browse files Browse the repository at this point in the history
Added config changes server event
  • Loading branch information
MeirShpilraien authored Jun 12, 2023
1 parent 4eb77dc commit 4f82bce
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 1 deletion.
18 changes: 17 additions & 1 deletion examples/server_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::sync::atomic::{AtomicI64, Ordering};
use redis_module::{
redis_module, server_events::FlushSubevent, Context, RedisResult, RedisString, RedisValue,
};
use redis_module_macros::flush_event_handler;
use redis_module_macros::{config_changed_event_handler, flush_event_handler};

static NUM_FLUSHES: AtomicI64 = AtomicI64::new(0);
static NUM_MAX_MEMORY_CONFIGURATION_CHANGES: AtomicI64 = AtomicI64::new(0);

#[flush_event_handler]
fn flushed_event_handler(_ctx: &Context, flush_event: FlushSubevent) {
Expand All @@ -14,10 +15,24 @@ fn flushed_event_handler(_ctx: &Context, flush_event: FlushSubevent) {
}
}

#[config_changed_event_handler]
fn config_changed_event_handler(_ctx: &Context, changed_configs: &[&str]) {
changed_configs
.iter()
.find(|v| **v == "maxmemory")
.map(|_| NUM_MAX_MEMORY_CONFIGURATION_CHANGES.fetch_add(1, Ordering::SeqCst));
}

fn num_flushed(_ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
Ok(RedisValue::Integer(NUM_FLUSHES.load(Ordering::SeqCst)))
}

fn num_maxmemory_changes(_ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
Ok(RedisValue::Integer(
NUM_MAX_MEMORY_CONFIGURATION_CHANGES.load(Ordering::SeqCst),
))
}

//////////////////////////////////////////////////////

redis_module! {
Expand All @@ -27,5 +42,6 @@ redis_module! {
data_types: [],
commands: [
["num_flushed", num_flushed, "readonly", 0, 0, 0],
["num_max_memory_changes", num_maxmemory_changes, "readonly", 0, 0, 0],
],
}
59 changes: 59 additions & 0 deletions redismodule-rs-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ pub fn command(attr: TokenStream, item: TokenStream) -> TokenStream {
command::redis_command(attr, item)
}

/// Proc macro which is set on a function that need to be called whenever the server role changes.
/// The function must accept a [Context] and [ServerRole].
///
/// Example:
///
/// ```rust,no_run,ignore
/// #[role_changed_event_handler]
/// fn role_changed_event_handler(ctx: &Context, values: ServerRole) { ... }
/// ```
#[proc_macro_attribute]
pub fn role_changed_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
let ast: ItemFn = match syn::parse(item) {
Expand All @@ -97,6 +106,15 @@ pub fn role_changed_event_handler(_attr: TokenStream, item: TokenStream) -> Toke
gen.into()
}

/// Proc macro which is set on a function that need to be called whenever a loading event happened.
/// The function must accept a [Context] and [LoadingSubevent].
///
/// Example:
///
/// ```rust,no_run,ignore
/// #[loading_event_handler]
/// fn loading_event_handler(ctx: &Context, values: LoadingSubevent) { ... }
/// ```
#[proc_macro_attribute]
pub fn loading_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
let ast: ItemFn = match syn::parse(item) {
Expand All @@ -110,6 +128,15 @@ pub fn loading_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStre
gen.into()
}

/// Proc macro which is set on a function that need to be called whenever a flush event happened.
/// The function must accept a [Context] and [FlushSubevent].
///
/// Example:
///
/// ```rust,no_run,ignore
/// #[flush_event_handler]
/// fn flush_event_handler(ctx: &Context, values: FlushSubevent) { ... }
/// ```
#[proc_macro_attribute]
pub fn flush_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
let ast: ItemFn = match syn::parse(item) {
Expand All @@ -123,6 +150,15 @@ pub fn flush_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream
gen.into()
}

/// Proc macro which is set on a function that need to be called whenever a module is loaded or unloaded on the server.
/// The function must accept a [Context] and [ModuleChangeSubevent].
///
/// Example:
///
/// ```rust,no_run,ignore
/// #[module_changed_event_handler]
/// fn module_changed_event_handler(ctx: &Context, values: ModuleChangeSubevent) { ... }
/// ```
#[proc_macro_attribute]
pub fn module_changed_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
let ast: ItemFn = match syn::parse(item) {
Expand All @@ -136,6 +172,29 @@ pub fn module_changed_event_handler(_attr: TokenStream, item: TokenStream) -> To
gen.into()
}

/// Proc macro which is set on a function that need to be called whenever a configuration change
/// event is happening. The function must accept a [Context] and [&[&str]] that contains the names
/// of the configiration values that was changed.
///
/// Example:
///
/// ```rust,no_run,ignore
/// #[config_changed_event_handler]
/// fn configuration_changed_event_handler(ctx: &Context, values: &[&str]) { ... }
/// ```
#[proc_macro_attribute]
pub fn config_changed_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
let ast: ItemFn = match syn::parse(item) {
Ok(res) => res,
Err(e) => return e.to_compile_error().into(),
};
let gen = quote! {
#[linkme::distributed_slice(redis_module::server_events::CONFIG_CHANGED_SERVER_EVENTS_LIST)]
#ast
};
gen.into()
}

/// The macro auto generate a [From] implementation that can convert the struct into [RedisValue].
///
/// Example:
Expand Down
41 changes: 41 additions & 0 deletions src/context/server_events.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::ffi::CStr;

use crate::raw;
use crate::{context::Context, RedisError};
use linkme::distributed_slice;
Expand Down Expand Up @@ -49,6 +51,9 @@ pub static FLUSH_SERVER_EVENTS_LIST: [fn(&Context, FlushSubevent)] = [..];
#[distributed_slice()]
pub static MODULE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ModuleChangeSubevent)] = [..];

#[distributed_slice()]
pub static CONFIG_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, &[&str])] = [..];

extern "C" fn role_changed_callback(
ctx: *mut raw::RedisModuleCtx,
_eid: raw::RedisModuleEvent,
Expand Down Expand Up @@ -121,6 +126,36 @@ extern "C" fn module_change_event_callback(
});
}

extern "C" fn config_change_event_callback(
ctx: *mut raw::RedisModuleCtx,
_eid: raw::RedisModuleEvent,
_subevent: u64,
data: *mut ::std::os::raw::c_void,
) {
let data: &raw::RedisModuleConfigChange =
unsafe { &*(data as *mut raw::RedisModuleConfigChange) };
let config_names: Vec<_> = (0..data.num_changes)
.into_iter()
.map(|i| unsafe {
let name = *data.config_names.offset(i as isize) as *mut i8;
CStr::from_ptr(name)
})
.collect();
let config_names: Vec<_> = config_names
.iter()
.map(|v| {
v.to_str()
.expect("Got a configuration name which is not a valid utf8")
})
.collect();
let ctx = Context::new(ctx);
CONFIG_CHANGED_SERVER_EVENTS_LIST
.iter()
.for_each(|callback| {
callback(&ctx, config_names.as_slice());
});
}

fn register_single_server_event_type<T>(
ctx: &Context,
callbacks: &[fn(&Context, T)],
Expand Down Expand Up @@ -171,5 +206,11 @@ pub fn register_server_events(ctx: &Context) -> Result<(), RedisError> {
raw::REDISMODULE_EVENT_MODULE_CHANGE,
Some(module_change_event_callback),
)?;
register_single_server_event_type(
ctx,
&CONFIG_CHANGED_SERVER_EVENTS_LIST,
raw::REDISMODULE_EVENT_CONFIG,
Some(config_change_event_callback),
)?;
Ok(())
}
18 changes: 18 additions & 0 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,24 @@ fn test_server_event() -> Result<()> {

assert_eq!(res, 2);

redis::cmd("config")
.arg(&["set", "maxmemory", "1"])
.query(&mut con)
.with_context(|| "failed to run string.set")?;

let res: i64 = redis::cmd("num_max_memory_changes").query(&mut con)?;

assert_eq!(res, 1);

redis::cmd("config")
.arg(&["set", "maxmemory", "0"])
.query(&mut con)
.with_context(|| "failed to run string.set")?;

let res: i64 = redis::cmd("num_max_memory_changes").query(&mut con)?;

assert_eq!(res, 2);

Ok(())
}

Expand Down

0 comments on commit 4f82bce

Please sign in to comment.