Skip to content

Commit

Permalink
The PR introduce small API fixes/changes.
Browse files Browse the repository at this point in the history
Main changes:

* Key space notification API will get the keys as `&[u8]` instead of `&str` because key name can be binary.
* ErrorReply was changed to be an enum of `RedisError` or general message so we can return this error type even without having an actual `RedisModuleCallReply` error.
* Introduce `DetachContext` struct which inplements `Sync` and `Send`. This can be used to create a global context for logging only.
* `replicate` can accept also binary data and not only utf8.
* `create_string` can accept binary data and not only utf8.
* `autenticate_user` implementation was fixed to use the correct `RedisModuleAPI`.
* `ThreadSafeContext` lock function was fixed to avoid reuse the context and avoid duplicate context free.
* `RedisModule_Init` was split to `RedisModule_Init` and `RedisModule_InitAPI` so we can intialize the API without register a module. This is usefull for modules that load more plugins and want to intialize the `RedisModuleAPI` on the plugin but without Register it as a another module. We should consider backport this change to Redis.
* Move `init_func` callback after finish registration of commands and configuration so configuration value will be applied when called.
* Introduce `safe_clone` for `RedisModuleString`. In general `RedisModuleString` is none atomic ref counted object. So it is not safe to clone it if Redis GIL is not hold. `safe_clone` gets a context reference which indicating that Redis GIL is held.
* Implement serde serialize and deserialize for `RedisString`
  • Loading branch information
MeirShpilraien committed Apr 1, 2023
1 parent 447f8ab commit bf8d44b
Show file tree
Hide file tree
Showing 15 changed files with 272 additions and 69 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ strum_macros = "0.24"
#failure = "0.1"
backtrace = "0.3"
linkme = "0.3"
serde = { version = "1.0", features = ["derive"] }

[dev-dependencies]
anyhow = "1.0.38"
Expand Down
2 changes: 1 addition & 1 deletion examples/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn call_test(ctx: &Context, _: Vec<RedisString>) -> RedisResult {
// test resp3 on call_ext
let call_options = CallOptionsBuilder::new()
.script_mode()
.resp_3(CallOptionResp::Resp3)
.resp(CallOptionResp::Resp3)
.errors_as_replies()
.build();
ctx.call_ext::<_, CallResult>("HSET", &call_options, &["x", "foo", "bar"])
Expand Down
3 changes: 1 addition & 2 deletions examples/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use std::sync::{
use lazy_static::lazy_static;
use redis_module::{
configuration::{ConfigurationContext, ConfigurationFlags},
ConfigurationValue, Context, EnumConfigurationValue, RedisGILGuard, RedisResult, RedisString,
RedisValue,
ConfigurationValue, Context, RedisGILGuard, RedisResult, RedisString, RedisValue,
};

enum_configuration! {
Expand Down
12 changes: 7 additions & 5 deletions examples/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ use std::sync::atomic::{AtomicI64, Ordering};

static NUM_KEY_MISSES: AtomicI64 = AtomicI64::new(0);

fn on_event(ctx: &Context, event_type: NotifyEvent, event: &str, key: &str) {
fn on_event(ctx: &Context, event_type: NotifyEvent, event: &str, key: &[u8]) {
let msg = format!(
"Received event: {:?} on key: {} via event: {}",
event_type, key, event
event_type,
std::str::from_utf8(key).unwrap(),
event
);
ctx.log_debug(msg.as_str());
}

fn on_stream(ctx: &Context, _event_type: NotifyEvent, _event: &str, _key: &str) {
fn on_stream(ctx: &Context, _event_type: NotifyEvent, _event: &str, _key: &[u8]) {
ctx.log_debug("Stream event received!");
}

Expand All @@ -34,7 +36,7 @@ fn event_send(ctx: &Context, args: Vec<RedisString>) -> RedisResult {
}
}

fn on_key_miss(_ctx: &Context, _event_type: NotifyEvent, _event: &str, _key: &str) {
fn on_key_miss(_ctx: &Context, _event_type: NotifyEvent, _event: &str, _key: &[u8]) {
NUM_KEY_MISSES.fetch_add(1, Ordering::SeqCst);
}

Expand All @@ -56,7 +58,7 @@ redis_module! {
[@EXPIRED @EVICTED: on_event],
[@STREAM: on_stream],
[@MISSED: on_key_miss],
]
],
}

//////////////////////////////////////////////////////
Expand Down
12 changes: 7 additions & 5 deletions src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ macro_rules! enum_configuration {
($(#[$meta:meta])* $vis:vis enum $name:ident {
$($(#[$vmeta:meta])* $vname:ident = $val:expr,)*
}) => {
use $crate::configuration::EnumConfigurationValue;
$(#[$meta])*
$vis enum $name {
$($(#[$vmeta])* $vname = $val,)*
Expand Down Expand Up @@ -140,7 +141,7 @@ impl ConfigurationValue<i64> for AtomicI64 {
impl ConfigurationValue<RedisString> for RedisGILGuard<String> {
fn get(&self, ctx: &ConfigurationContext) -> RedisString {
let value = self.lock(ctx);
RedisString::create(None, &value)
RedisString::create(None, value.as_str())
}
fn set(&self, ctx: &ConfigurationContext, val: RedisString) -> Result<(), RedisError> {
let mut value = self.lock(ctx);
Expand All @@ -152,7 +153,7 @@ impl ConfigurationValue<RedisString> for RedisGILGuard<String> {
impl ConfigurationValue<RedisString> for Mutex<String> {
fn get(&self, _ctx: &ConfigurationContext) -> RedisString {
let value = self.lock().unwrap();
RedisString::create(None, &value)
RedisString::create(None, value.as_str())
}
fn set(&self, _ctx: &ConfigurationContext, val: RedisString) -> Result<(), RedisError> {
let mut value = self.lock().unwrap();
Expand Down Expand Up @@ -184,7 +185,7 @@ impl<G, T: ConfigurationValue<G> + 'static> ConfigrationPrivateData<G, T> {
// we know the GIL is held so it is safe to use Context::dummy().
let configuration_ctx = ConfigurationContext::new();
if let Err(e) = self.variable.set(&configuration_ctx, val) {
let error_msg = RedisString::create(None, &e.to_string());
let error_msg = RedisString::create(None, e.to_string().as_str());
unsafe { *err = error_msg.take() };
return raw::REDISMODULE_ERR as i32;
}
Expand Down Expand Up @@ -366,7 +367,7 @@ extern "C" fn enum_configuration_set<
match val {
Ok(val) => private_data.set_val(name, val, err),
Err(e) => {
let error_msg = RedisString::create(None, &e.to_string());
let error_msg = RedisString::create(None, e.to_string().as_str());
unsafe { *err = error_msg.take() };
raw::REDISMODULE_ERR as i32
}
Expand Down Expand Up @@ -427,7 +428,7 @@ pub fn register_enum_configuration<G: EnumConfigurationValue, T: ConfigurationVa

pub fn apply_module_args_as_configuration(
ctx: &Context,
mut args: Vec<RedisString>,
args: &[RedisString],
) -> Result<(), RedisError> {
if args.len() == 0 {
return Ok(());
Expand All @@ -437,6 +438,7 @@ pub fn apply_module_args_as_configuration(
"Arguments lenght is not devided by 2 (require to be read as module configuration).",
));
}
let mut args = args.to_vec();
args.insert(0, ctx.create_string("set"));
ctx.call(
"config",
Expand Down
43 changes: 32 additions & 11 deletions src/context/call_reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ pub struct ErrorCallReply<'root> {
_dummy: PhantomData<&'root ()>,
}

pub enum ErrorReply<'root> {
Msg(String),
RedisError(ErrorCallReply<'root>),
}

impl<'root> ErrorCallReply<'root> {
/// Convert ErrorCallReply to String.
/// Return None data is not a valid utf8.
Expand All @@ -59,7 +64,26 @@ impl<'root> ErrorCallReply<'root> {
}
}

impl<'root> Debug for ErrorCallReply<'root> {
impl<'root> ErrorReply<'root> {
/// Convert ErrorCallReply to String.
/// Return None data is not a valid utf8.
pub fn to_string(&self) -> Option<String> {
match self {
ErrorReply::Msg(s) => Some(s.clone()),
ErrorReply::RedisError(r) => r.to_string(),
}
}

/// Return the ErrorCallReply data as &[u8]
pub fn as_bytes(&self) -> &[u8] {
match self {
ErrorReply::Msg(s) => s.as_bytes(),
ErrorReply::RedisError(r) => r.as_bytes(),
}
}
}

impl<'root> Debug for ErrorReply<'root> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
Expand Down Expand Up @@ -332,16 +356,13 @@ impl<'root> VerbatimStringCallReply<'root> {
/// Return None if the format is not a valid utf8.
pub fn as_parts(&self) -> Option<(&str, &[u8])> {
let mut len: usize = 0;
let format: *const u8 = std::ptr::null();
let mut format: *const c_char = std::ptr::null();
let reply_string: *mut u8 = unsafe {
RedisModule_CallReplyVerbatim.unwrap()(
self.reply.as_ptr(),
&mut len,
&mut (format as *const c_char),
) as *mut u8
RedisModule_CallReplyVerbatim.unwrap()(self.reply.as_ptr(), &mut len, &mut format)
as *mut u8
};
Some((
std::str::from_utf8(unsafe { slice::from_raw_parts(format, 3) })
std::str::from_utf8(unsafe { slice::from_raw_parts(format as *const u8, 3) })
.ok()
.unwrap(),
unsafe { slice::from_raw_parts(reply_string, len) },
Expand Down Expand Up @@ -381,10 +402,10 @@ fn create_call_reply<'root>(reply: NonNull<RedisModuleCallReply>) -> CallResult<
reply: reply,
_dummy: PhantomData,
})),
ReplyType::Error => Err(ErrorCallReply {
ReplyType::Error => Err(ErrorReply::RedisError(ErrorCallReply {
reply: reply,
_dummy: PhantomData,
}),
})),
ReplyType::Array => Ok(CallReply::Array(ArrayCallReply {
reply: reply,
_dummy: PhantomData,
Expand Down Expand Up @@ -426,4 +447,4 @@ pub(crate) fn create_root_call_reply<'root>(
reply.map_or(Ok(CallReply::Unknown), |v| create_call_reply(v))
}

pub type CallResult<'root> = Result<CallReply<'root>, ErrorCallReply<'root>>;
pub type CallResult<'root> = Result<CallReply<'root>, ErrorReply<'root>>;
107 changes: 91 additions & 16 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bitflags::bitflags;
use std::borrow::Borrow;
use std::cell::UnsafeCell;
use std::ffi::CString;
use std::os::raw::{c_char, c_int, c_long, c_longlong};
use std::ptr::{self, NonNull};
Expand Down Expand Up @@ -37,6 +37,7 @@ pub struct CallOptionsBuilder {
options: String,
}

#[derive(Clone)]
pub struct CallOptions {
options: CString,
}
Expand Down Expand Up @@ -100,7 +101,7 @@ impl CallOptionsBuilder {
}

/// Allow control the protocol version in which the replies will be returned.
pub fn resp_3(mut self, resp: CallOptionResp) -> CallOptionsBuilder {
pub fn resp(mut self, resp: CallOptionResp) -> CallOptionsBuilder {
match resp {
CallOptionResp::Auto => self.add_flag("0"),
CallOptionResp::Resp2 => (),
Expand All @@ -117,12 +118,80 @@ impl CallOptionsBuilder {
}
}

/// This struct allows logging when the Redis GIL is not acquired.
/// It is implemented `Send` and `Sync` so it can safely be used
/// from within different threads.
pub struct DetachContext {
ctx: UnsafeCell<Option<NonNull<raw::RedisModuleCtx>>>,
}

impl Default for DetachContext {
fn default() -> Self {
DetachContext {
ctx: UnsafeCell::new(None),
}
}
}

impl DetachContext {
pub fn log(&self, level: LogLevel, message: &str) {
let c = unsafe { &*self.ctx.get() };
crate::logging::log_internal(c.map_or(ptr::null_mut(), |v| v.as_ptr()), level, message);
}

pub fn log_debug(&self, message: &str) {
self.log(LogLevel::Debug, message);
}

pub fn log_notice(&self, message: &str) {
self.log(LogLevel::Notice, message);
}

pub fn log_verbose(&self, message: &str) {
self.log(LogLevel::Verbose, message);
}

pub fn log_warning(&self, message: &str) {
self.log(LogLevel::Warning, message);
}

pub fn set_context(&self, ctx: &Context) {
let curr = unsafe { &mut *self.ctx.get() };
let ctx = unsafe { raw::RedisModule_GetDetachedThreadSafeContext.unwrap()(ctx.ctx) };
*curr = NonNull::new(ctx);
}
}

unsafe impl Send for DetachContext {}
unsafe impl Sync for DetachContext {}

/// `Context` is a structure that's designed to give us a high-level interface to
/// the Redis module API by abstracting away the raw C FFI calls.
pub struct Context {
pub ctx: *mut raw::RedisModuleCtx,
}

/// A guerd that protected a user that has
/// been set on a context using `autenticate_user`.
/// This guerd make sure to unset the user when freed.
/// It prevent privilege escalation security issues
/// that can happened by forgeting to unset the user.
pub struct ContextUserScope<'ctx> {
ctx: &'ctx Context,
}

impl<'ctx> Drop for ContextUserScope<'ctx> {
fn drop(&mut self) {
self.ctx.deautenticate_user();
}
}

impl<'ctx> ContextUserScope<'ctx> {
fn new(ctx: &'ctx Context) -> ContextUserScope<'ctx> {
ContextUserScope { ctx }
}
}

pub struct StrCallArgs<'a> {
is_owner: bool,
args: Vec<*mut raw::RedisModuleString>,
Expand Down Expand Up @@ -173,7 +242,7 @@ where
}

impl<'a> StrCallArgs<'a> {
fn args_mut(&mut self) -> &mut [*mut raw::RedisModuleString] {
pub(crate) fn args_mut(&mut self) -> &mut [*mut raw::RedisModuleString] {
&mut self.args
}
}
Expand Down Expand Up @@ -422,8 +491,13 @@ impl Context {
raw::replicate_verbatim(self.ctx);
}

/// Replicate command to the replica and AOF.
pub fn replicate<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) {
raw::replicate(self.ctx, command, args);
}

#[must_use]
pub fn create_string(&self, s: &str) -> RedisString {
pub fn create_string<T: Into<Vec<u8>>>(&self, s: T) -> RedisString {
RedisString::create(NonNull::new(self.ctx), s)
}

Expand Down Expand Up @@ -534,19 +608,20 @@ impl Context {
/// Attach the given user to the current context so each operation performed from
/// now on using this context will be validated againts this new user.
/// Return Status::Ok on success and Status::Err or failure.
pub fn autenticate_user<T: Borrow<[u8]>>(&self, user_name: T) -> raw::Status {
let user_name_blob: &[u8] = user_name.borrow();
unsafe {
raw::RedisModule_AuthenticateClientWithACLUser.unwrap()(
self.ctx,
user_name_blob.as_ptr() as *const c_char,
user_name_blob.len(),
None,
ptr::null_mut(),
ptr::null_mut(),
)
pub fn autenticate_user(
&self,
user_name: &RedisString,
) -> Result<ContextUserScope<'_>, RedisError> {
let user = unsafe { raw::RedisModule_GetModuleUserFromUserName.unwrap()(user_name.inner) };
if user.is_null() {
return Err(RedisError::Str("User does not exists or disabled"));
}
.into()
unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, user) };
Ok(ContextUserScope::new(self))
}

fn deautenticate_user(&self) {
unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, ptr::null_mut()) };
}

/// Verify the the given user has the give ACL permission on the given key.
Expand Down
3 changes: 2 additions & 1 deletion src/context/thread_safe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ impl<B: Send> ThreadSafeContext<B> {
/// similar to `std::sync::Mutex`.
pub fn lock(&self) -> ContextGuard {
unsafe { raw::RedisModule_ThreadSafeContextLock.unwrap()(self.ctx) };
let ctx = Context::new(self.ctx);
let ctx = unsafe { raw::RedisModule_GetThreadSafeContext.unwrap()(ptr::null_mut()) };
let ctx = Context::new(ctx);
ContextGuard { ctx }
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/include/redismodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ REDISMODULE_API int (*RedisModule_LoadConfigs)(RedisModuleCtx *ctx) REDISMODULE_

/* This is included inline inside each Redis module. */
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) REDISMODULE_ATTR_UNUSED;
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
static void RedisModule_InitAPI(RedisModuleCtx *ctx) {
void *getapifuncptr = ((void**)ctx)[0];
RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
REDISMODULE_GET_API(Alloc);
Expand Down Expand Up @@ -1547,7 +1547,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(RegisterStringConfig);
REDISMODULE_GET_API(RegisterEnumConfig);
REDISMODULE_GET_API(LoadConfigs);
}

static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
RedisModule_InitAPI(ctx);
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
RedisModule_SetModuleAttribs(ctx,name,ver,apiver);
return REDISMODULE_OK;
Expand Down
Loading

0 comments on commit bf8d44b

Please sign in to comment.