From a5458b89f9212b252a621f6c00c0875e055303e8 Mon Sep 17 00:00:00 2001 From: terkwood <38859656+Terkwood@users.noreply.github.com> Date: Sat, 16 May 2020 18:40:26 -0400 Subject: [PATCH] Add redis streams capabilities --- Cargo.toml | 2 +- src/commands.rs | 664 +++++++++++++++++++++++++++++++++++++++++- src/streams.rs | 658 +++++++++++++++++++++++++++++++++++++++++ tests/test_streams.rs | 568 ++++++++++++++++++++++++++++++++++++ 4 files changed, 1890 insertions(+), 2 deletions(-) create mode 100644 tests/test_streams.rs diff --git a/Cargo.toml b/Cargo.toml index caf274a30..1f394fd16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ async-std = { version = "1.5.0", optional = true} async-trait = "0.1.24" [features] -default = ["geospatial", "tokio-comp", "async-std-comp", "script"] +default = ["streams", "geospatial", "tokio-comp", "async-std-comp", "script"] aio = ["bytes", "pin-project-lite", "futures-util", "futures-util/sink", "tokio/sync", "tokio/stream", "tokio/tcp", "tokio/uds", "tokio/io-util", "tokio-util", "tokio-util/codec", "combine/tokio-02"] tokio-rt-core = ["tokio-comp", "tokio/rt-core"] geospatial = [] diff --git a/src/commands.rs b/src/commands.rs index 04ad0c9f2..20edd265a 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -7,6 +7,9 @@ use crate::cmd::{cmd, Cmd, Pipeline, Iter}; #[cfg(feature = "geospatial")] use crate::geo; +#[cfg(feature = "streams")] +use crate::streams; + macro_rules! implement_commands { ( $lifetime: lifetime @@ -834,7 +837,9 @@ implement_commands! { cmd("PUBLISH").arg(channel).arg(message) } + // // geospatial commands + // /// Adds the specified geospatial items to the specified key. /// @@ -1032,9 +1037,666 @@ implement_commands! { .arg(options) } - + // // streams commands + // + + /// Ack pending stream messages checked out by a consumer. + /// + /// ```text + /// XACK ... + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xack( + key: K, + group: G, + ids: &'a [I]) { + cmd("XACK") + .arg(key) + .arg(group) + .arg(ids) + } + + + /// Add a stream message by `key`. Use `*` as the `id` for the current timestamp. + /// + /// ```text + /// XADD key [field value] [field value] ... + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xadd( + key: K, + id: ID, + items: &'a [(F, V)] + ) { + cmd("XADD").arg(key).arg(id).arg(items) + } + + + /// BTreeMap variant for adding a stream message by `key`. + /// Use `*` as the `id` for the current timestamp. + /// + /// ```text + /// XADD key [rust BTreeMap] ... + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xadd_map( + key: K, + id: ID, + map: BTM + ) { + cmd("XADD").arg(key).arg(id).arg(map) + } + + /// Add a stream message while capping the stream at a maxlength. + /// + /// ```text + /// XADD key [MAXLEN [~|=] ] [field value] [field value] ... + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xadd_maxlen< + K: ToRedisArgs, + ID: ToRedisArgs, + F: ToRedisArgs, + V: ToRedisArgs + >( + key: K, + maxlen: streams::StreamMaxlen, + id: ID, + items: &'a [(F, V)] + ) { + cmd("XADD") + .arg(key) + .arg(maxlen) + .arg(id) + .arg(items) + } + + + /// BTreeMap variant for adding a stream message while capping the stream at a maxlength. + /// + /// ```text + /// XADD key [MAXLEN [~|=] ] [rust BTreeMap] ... + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xadd_maxlen_map( + key: K, + maxlen: streams::StreamMaxlen, + id: ID, + map: BTM + ) { + cmd("XADD") + .arg(key) + .arg(maxlen) + .arg(id) + .arg(map) + } + + + /// Claim pending, unacked messages, after some period of time, + /// currently checked out by another consumer. + /// + /// This method only accepts the must-have arguments for claiming messages. + /// If optional arguments are required, see `xclaim_options` below. + /// + /// ```text + /// XCLAIM [ ] + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xclaim( + key: K, + group: G, + consumer: C, + min_idle_time: MIT, + ids: &'a [ID] + ) { + cmd("XCLAIM") + .arg(key) + .arg(group) + .arg(consumer) + .arg(min_idle_time) + .arg(ids) + } + + + /// This is the optional arguments version for claiming unacked, pending messages + /// currently checked out by another consumer. + /// + /// ```no_run + /// use crate::{client_open,Connection,Commands,RedisResult}; + /// use crate::streams::{StreamClaimOptions,StreamClaimReply}; + /// let client = client_open("redis://127.0.0.1/0").unwrap(); + /// let mut con = client.get_connection().unwrap(); + /// + /// // Claim all pending messages for key "k1", + /// // from group "g1", checked out by consumer "c1" + /// // for 10ms with RETRYCOUNT 2 and FORCE + /// + /// let opts = StreamClaimOptions::default() + /// .with_force() + /// .retry(2); + /// let results: RedisResult = + /// con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts); + /// + /// // All optional arguments return a `Result` with one exception: + /// // Passing JUSTID returns only the message `id` and omits the HashMap for each message. + /// + /// let opts = StreamClaimOptions::default() + /// .with_justid(); + /// let results: RedisResult> = + /// con.xclaim_options("k1", "g1", "c1", 10, &["0"], opts); + /// ``` + /// + /// ```text + /// XCLAIM + /// [IDLE ] [TIME ] [RETRYCOUNT ] + /// [FORCE] [JUSTID] + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xclaim_options< + K: ToRedisArgs, + G: ToRedisArgs, + C: ToRedisArgs, + MIT: ToRedisArgs, + ID: ToRedisArgs + >( + key: K, + group: G, + consumer: C, + min_idle_time: MIT, + ids: &'a [ID], + options: streams::StreamClaimOptions + ) { + cmd("XCLAIM") + .arg(key) + .arg(group) + .arg(consumer) + .arg(min_idle_time) + .arg(ids) + .arg(options) + } + + + /// Deletes a list of `id`s for a given stream `key`. + /// + /// ```text + /// XDEL [ ... ] + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xdel( + key: K, + ids: &'a [ID] + ) { + cmd("XDEL").arg(key).arg(ids) + } + + + /// This command is used for creating a consumer `group`. It expects the stream key + /// to already exist. Otherwise, use `xgroup_create_mkstream` if it doesn't. + /// The `id` is the starting message id all consumers should read from. Use `$` If you want + /// all consumers to read from the last message added to stream. + /// + /// ```text + /// XGROUP CREATE + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xgroup_create( + key: K, + group: G, + id: ID + ) { + cmd("XGROUP") + .arg("CREATE") + .arg(key) + .arg(group) + .arg(id) + } + + + /// This is the alternate version for creating a consumer `group` + /// which makes the stream if it doesn't exist. + /// + /// ```text + /// XGROUP CREATE [MKSTREAM] + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xgroup_create_mkstream< + K: ToRedisArgs, + G: ToRedisArgs, + ID: ToRedisArgs + >( + key: K, + group: G, + id: ID + ) { + cmd("XGROUP") + .arg("CREATE") + .arg(key) + .arg(group) + .arg(id) + .arg("MKSTREAM") + } + + + /// Alter which `id` you want consumers to begin reading from an existing + /// consumer `group`. + /// + /// ```text + /// XGROUP SETID + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xgroup_setid( + key: K, + group: G, + id: ID + ) { + cmd("XGROUP") + .arg("SETID") + .arg(key) + .arg(group) + .arg(id) + } + + + /// Destroy an existing consumer `group` for a given stream `key` + /// + /// ```text + /// XGROUP SETID + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xgroup_destroy( + key: K, + group: G + ) { + cmd("XGROUP").arg("DESTROY").arg(key).arg(group) + } + + /// This deletes a `consumer` from an existing consumer `group` + /// for given stream `key. + /// + /// ```text + /// XGROUP DELCONSUMER + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xgroup_delconsumer( + key: K, + group: G, + consumer: C + ) { + cmd("XGROUP") + .arg("DELCONSUMER") + .arg(key) + .arg(group) + .arg(consumer) + } + + + /// This returns all info details about + /// which consumers have read messages for given consumer `group`. + /// Take note of the StreamInfoConsumersReply return type. + /// + /// *It's possible this return value might not contain new fields + /// added by Redis in future versions.* + /// + /// ```text + /// XINFO CONSUMERS + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xinfo_consumers( + key: K, + group: G + ) { + cmd("XINFO") + .arg("CONSUMERS") + .arg(key) + .arg(group) + } + + + /// Returns all consumer `group`s created for a given stream `key`. + /// Take note of the StreamInfoGroupsReply return type. + /// + /// *It's possible this return value might not contain new fields + /// added by Redis in future versions.* + /// + /// ```text + /// XINFO GROUPS + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xinfo_groups(key: K) { + cmd("XINFO").arg("GROUPS").arg(key) + } + + + /// Returns info about high-level stream details + /// (first & last message `id`, length, number of groups, etc.) + /// Take note of the StreamInfoStreamReply return type. + /// + /// *It's possible this return value might not contain new fields + /// added by Redis in future versions.* + /// + /// ```text + /// XINFO STREAM + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xinfo_stream(key: K) { + cmd("XINFO").arg("STREAM").arg(key) + } + + /// Returns the number of messages for a given stream `key`. + /// + /// ```text + /// XLEN + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xlen(key: K) { + cmd("XLEN").arg(key) + } + + + /// This is a basic version of making XPENDING command calls which only + /// passes a stream `key` and consumer `group` and it + /// returns details about which consumers have pending messages + /// that haven't been acked. + /// + /// You can use this method along with + /// `xclaim` or `xclaim_options` for determining which messages + /// need to be retried. + /// + /// Take note of the StreamPendingReply return type. + /// + /// ```text + /// XPENDING [ []] + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xpending( + key: K, + group: G + ) { + cmd("XPENDING").arg(key).arg(group) + } + + + /// This XPENDING version returns a list of all messages over the range. + /// You can use this for paginating pending messages (but without the message HashMap). + /// + /// Start and end follow the same rules `xrange` args. Set start to `-` + /// and end to `+` for the entire stream. + /// + /// Take note of the StreamPendingCountReply return type. + /// + /// ```text + /// XPENDING + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xpending_count< + K: ToRedisArgs, + G: ToRedisArgs, + S: ToRedisArgs, + E: ToRedisArgs, + C: ToRedisArgs + >( + key: K, + group: G, + start: S, + end: E, + count: C + ) { + cmd("XPENDING") + .arg(key) + .arg(group) + .arg(start) + .arg(end) + .arg(count) + } + + + /// An alternate version of `xpending_count` which filters by `consumer` name. + /// + /// Start and end follow the same rules `xrange` args. Set start to `-` + /// and end to `+` for the entire stream. + /// + /// Take note of the StreamPendingCountReply return type. + /// + /// ```text + /// XPENDING + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xpending_consumer_count< + K: ToRedisArgs, + G: ToRedisArgs, + S: ToRedisArgs, + E: ToRedisArgs, + C: ToRedisArgs, + CN: ToRedisArgs + >( + key: K, + group: G, + start: S, + end: E, + count: C, + consumer: CN + ) { + cmd("XPENDING") + .arg(key) + .arg(group) + .arg(start) + .arg(end) + .arg(count) + .arg(consumer) + } + + /// Returns a range of messages in a given stream `key`. + /// + /// Set `start` to `-` to begin at the first message. + /// Set `end` to `+` to end the most recent message. + /// You can pass message `id` to both `start` and `end`. + /// + /// Take note of the StreamRangeReply return type. + /// + /// ```text + /// XRANGE key start end + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xrange( + key: K, + start: S, + end: E + ) { + cmd("XRANGE").arg(key).arg(start).arg(end) + } + + + /// A helper method for automatically returning all messages in a stream by `key`. + /// **Use with caution!** + /// + /// ```text + /// XRANGE key - + + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xrange_all(key: K) { + cmd("XRANGE").arg(key).arg("-").arg("+") + } + + + /// A method for paginating a stream by `key`. + /// + /// ```text + /// XRANGE key start end [COUNT ] + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xrange_count( + key: K, + start: S, + end: E, + count: C + ) { + cmd("XRANGE") + .arg(key) + .arg(start) + .arg(end) + .arg("COUNT") + .arg(count) + } + + + /// Read a list of `id`s for each stream `key`. + /// This is the basic form of reading streams. + /// For more advanced control, like blocking, limiting, or reading by consumer `group`, + /// see `xread_options`. + /// + /// ```text + /// XREAD STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xread( + keys: &'a [K], + ids: &'a [ID] + ) { + cmd("XREAD").arg("STREAMS").arg(keys).arg(ids) + } + + /// This method handles setting optional arguments for + /// `XREAD` or `XREADGROUP` Redis commands. + /// ```no_run + /// use crate::{client_open,Connection,RedisResult}; + /// use crate::streams::{StreamReadOptions,StreamReadReply}; + /// let client = client_open("redis://127.0.0.1/0").unwrap(); + /// let mut con = client.get_connection().unwrap(); + /// + /// // Read 10 messages from the start of the stream, + /// // without registering as a consumer group. + /// + /// let opts = StreamReadOptions::default() + /// .count(10); + /// let results: RedisResult = + /// con.xread_options(&["k1"], &["0"], opts); + /// + /// // Read all undelivered messages for a given + /// // consumer group. Be advised: the consumer group must already + /// // exist before making this call. Also note: we're passing + /// // '>' as the id here, which means all undelivered messages. + /// + /// let opts = StreamReadOptions::default() + /// .group("group-1", "consumer-1"); + /// let results: RedisResult = + /// con.xread_options(&["k1"], &[">"], opts); + /// ``` + /// + /// ```text + /// XREAD [BLOCK ] [COUNT ] + /// STREAMS key_1 key_2 ... key_N + /// ID_1 ID_2 ... ID_N + /// + /// XREADGROUP [BLOCK ] [COUNT ] [NOACK] [GROUP group-name consumer-name] + /// STREAMS key_1 key_2 ... key_N + /// ID_1 ID_2 ... ID_N + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xread_options( + keys: &'a [K], + ids: &'a [ID], + options: streams::StreamReadOptions + ) { + cmd(if options.read_only() { + "XREAD" + } else { + "XREADGROUP" + }) + .arg(options) + .arg("STREAMS") + .arg(keys) + .arg(ids) + } + + /// This is the reverse version of `xrange`. + /// The same rules apply for `start` and `end` here. + /// + /// ```text + /// XREVRANGE key end start + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xrevrange( + key: K, + end: E, + start: S + ) { + cmd("XREVRANGE").arg(key).arg(end).arg(start) + } + + /// This is the reverse version of `xrange_all`. + /// The same rules apply for `start` and `end` here. + /// + /// ```text + /// XREVRANGE key + - + /// ``` + fn xrevrange_all(key: K) { + cmd("XREVRANGE").arg(key).arg("+").arg("-") + } + + /// This is the reverse version of `xrange_count`. + /// The same rules apply for `start` and `end` here. + /// + /// ```text + /// XREVRANGE key end start [COUNT ] + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xrevrange_count( + key: K, + end: E, + start: S, + count: C + ) { + cmd("XREVRANGE") + .arg(key) + .arg(end) + .arg(start) + .arg("COUNT") + .arg(count) + } + + + /// Trim a stream `key` to a MAXLEN count. + /// + /// ```text + /// XTRIM MAXLEN [~|=] (Same as XADD MAXLEN option) + /// ``` + #[cfg(feature = "streams")] + #[cfg_attr(docsrs, doc(cfg(feature = "streams")))] + fn xtrim( + key: K, + maxlen: streams::StreamMaxlen + ) { + cmd("XTRIM").arg(key).arg(maxlen) + } } /// Allows pubsub callbacks to stop receiving messages. diff --git a/src/streams.rs b/src/streams.rs index 31f471cf1..1adc616ce 100644 --- a/src/streams.rs +++ b/src/streams.rs @@ -1 +1,659 @@ //! Defines types to use with the streams commands. + +use crate::{from_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, Value}; + +use std::collections::HashMap; +use std::io::{Error, ErrorKind}; + +// Stream Maxlen Enum + +/// Utility enum for passing `MAXLEN [= or ~] [COUNT]` +/// arguments into `StreamCommands`. +/// The enum value represents the count. +#[derive(PartialEq, Eq, Clone, Debug, Copy)] +pub enum StreamMaxlen { + /// Match an exact count + Equals(usize), + /// Match an approximate count + Aprrox(usize), +} + +impl ToRedisArgs for StreamMaxlen { + fn write_redis_args(&self, out: &mut W) + where + W: ?Sized + RedisWrite, + { + let (ch, val) = match *self { + StreamMaxlen::Equals(v) => ("=", v), + StreamMaxlen::Aprrox(v) => ("~", v), + }; + out.write_arg("MAXLEN".as_bytes()); + out.write_arg(ch.as_bytes()); + val.write_redis_args(out); + } +} + +/// Builder options for [`xclaim_options`] command. +/// +/// [`xclaim_options`]: ./trait.StreamCommands.html#method.xclaim_options +/// +#[derive(Default, Debug)] +pub struct StreamClaimOptions { + /// Set IDLE cmd arg. + idle: Option, + /// Set TIME cmd arg. + time: Option, + /// Set RETRYCOUNT cmd arg. + retry: Option, + /// Set FORCE cmd arg. + force: bool, + /// Set JUSTID cmd arg. Be advised: the response + /// type changes with this option. + justid: bool, +} + +impl StreamClaimOptions { + /// Set IDLE cmd arg. + pub fn idle(mut self, ms: usize) -> Self { + self.idle = Some(ms); + self + } + + /// Set TIME cmd arg. + pub fn time(mut self, ms_time: usize) -> Self { + self.time = Some(ms_time); + self + } + + /// Set RETRYCOUNT cmd arg. + pub fn retry(mut self, count: usize) -> Self { + self.retry = Some(count); + self + } + + /// Set FORCE cmd arg to true. + pub fn with_force(mut self) -> Self { + self.force = true; + self + } + + /// Set JUSTID cmd arg to true. Be advised: the response + /// type changes with this option. + pub fn with_justid(mut self) -> Self { + self.justid = true; + self + } +} + +impl ToRedisArgs for StreamClaimOptions { + fn write_redis_args(&self, out: &mut W) + where + W: ?Sized + RedisWrite, + { + if let Some(ref ms) = self.idle { + out.write_arg("IDLE".as_bytes()); + out.write_arg(format!("{}", ms).as_bytes()); + } + if let Some(ref ms_time) = self.time { + out.write_arg("TIME".as_bytes()); + out.write_arg(format!("{}", ms_time).as_bytes()); + } + if let Some(ref count) = self.retry { + out.write_arg("RETRYCOUNT".as_bytes()); + out.write_arg(format!("{}", count).as_bytes()); + } + if self.force { + out.write_arg("FORCE".as_bytes()); + } + if self.justid { + out.write_arg("JUSTID".as_bytes()); + } + } +} + +/// Builder options for [`xread_options`] command. +/// +/// [`xread_options`]: ./trait.StreamCommands.html#method.xread_options +/// +#[derive(Default, Debug)] +pub struct StreamReadOptions { + /// Set the BLOCK cmd arg. + block: Option, + /// Set the COUNT cmd arg. + count: Option, + /// Set the NOACK cmd arg. + noack: Option, + /// Set the GROUP cmd arg. + /// This option will toggle the cmd from XREAD to XREADGROUP. + group: Option<(Vec>, Vec>)>, +} + +impl StreamReadOptions { + /// Indicates whether the command is participating in a group + /// and generating ACKs + pub fn read_only(&self) -> bool { + self.group.is_none() + } + + /// Sets the command so that it avoids adding the message + /// to the PEL in cases where reliability is not a requirement + /// and the occasional message loss is acceptable. + pub fn noack(mut self) -> Self { + self.noack = Some(true); + self + } + + /// Sets the block time in milliseconds. + pub fn block(mut self, ms: usize) -> Self { + self.block = Some(ms); + self + } + + /// Sets the maximum number of elements to return per stream. + pub fn count(mut self, n: usize) -> Self { + self.count = Some(n); + self + } + + /// Sets the name of a consumer group associated to the stream. + pub fn group( + mut self, + group_name: GN, + consumer_name: CN, + ) -> Self { + self.group = Some(( + ToRedisArgs::to_redis_args(&group_name), + ToRedisArgs::to_redis_args(&consumer_name), + )); + self + } +} + +impl ToRedisArgs for StreamReadOptions { + fn write_redis_args(&self, out: &mut W) + where + W: ?Sized + RedisWrite, + { + if let Some(ref ms) = self.block { + out.write_arg("BLOCK".as_bytes()); + out.write_arg(format!("{}", ms).as_bytes()); + } + + if let Some(ref n) = self.count { + out.write_arg("COUNT".as_bytes()); + out.write_arg(format!("{}", n).as_bytes()); + } + + if let Some(ref group) = self.group { + // noack is only available w/ xreadgroup + if let Some(true) = self.noack { + out.write_arg("NOACK".as_bytes()); + } + + out.write_arg("GROUP".as_bytes()); + for i in &group.0 { + out.write_arg(i); + } + for i in &group.1 { + out.write_arg(i); + } + } + } +} + +/// Reply type used with [`xread`] or [`xread_options`] commands. +/// +/// [`xread`]: ./trait.StreamCommands.html#method.xread +/// [`xread_options`]: ./trait.StreamCommands.html#method.xread_options +/// +#[derive(Default, Debug, Clone)] +pub struct StreamReadReply { + /// Complex data structure containing a payload for each key in this array + pub keys: Vec, +} + +/// Reply type used with [`xrange`], [`xrange_count`], [`xrange_all`], [`xrevrange`], [`xrevrange_count`], [`xrevrange_all`] commands. +/// +/// Represents stream entries matching a given range of `id`'s. +/// +/// [`xrange`]: ./trait.StreamCommands.html#method.xrange +/// [`xrange_count`]: ./trait.StreamCommands.html#method.xrange_count +/// [`xrange_all`]: ./trait.StreamCommands.html#method.xrange_all +/// [`xrevrange`]: ./trait.StreamCommands.html#method.xrevrange +/// [`xrevrange_count`]: ./trait.StreamCommands.html#method.xrevrange_count +/// [`xrevrange_all`]: ./trait.StreamCommands.html#method.xrevrange_all +/// +#[derive(Default, Debug, Clone)] +pub struct StreamRangeReply { + /// Complex data structure containing a payload for each ID in this array + pub ids: Vec, +} + +/// Reply type used with [`xclaim`] command. +/// +/// Represents that ownership of the specified messages was changed. +/// +/// [`xclaim`]: ./trait.StreamCommands.html#method.xclaim +/// +#[derive(Default, Debug, Clone)] +pub struct StreamClaimReply { + /// Complex data structure containing a payload for each ID in this array + pub ids: Vec, +} + +/// Reply type used with [`xpending`] command. +/// +/// Data returned here were fetched from the stream without +/// having been acknowledged. +/// +/// [`xpending`]: ./trait.StreamCommands.html#method.xpending +/// +#[derive(Debug, Clone)] +pub enum StreamPendingReply { + /// The stream is empty. + Empty, + /// Data with payload exists in the stream. + Data(StreamPendingData), +} + +impl Default for StreamPendingReply { + fn default() -> StreamPendingReply { + StreamPendingReply::Empty + } +} + +impl StreamPendingReply { + /// Returns how many records are in the reply. + pub fn count(&self) -> usize { + match self { + StreamPendingReply::Empty => 0, + StreamPendingReply::Data(x) => x.count, + } + } +} + +/// Inner reply type when an [`xpending`] command has data. +#[derive(Default, Debug, Clone)] +pub struct StreamPendingData { + /// Limit on the number of messages to return per call. + pub count: usize, + /// ID for the first pending record. + pub start_id: String, + /// ID for the final pending record. + pub end_id: String, + /// Every consumer in the consumer group with at + /// least one pending message, + /// and the number of pending messages it has. + pub consumers: Vec, +} + +/// Reply type used with [`xpending_count`] and +/// [`xpending_consumer_count`] commands. +/// +/// Data returned here have been fetched from the stream without +/// any acknowledgement. +/// +/// [`xpending_count`]: ./trait.StreamCommands.html#method.xpending_count +/// [`xpending_consumer_count`]: ./trait.StreamCommands.html#method.xpending_consumer_count +/// +#[derive(Default, Debug, Clone)] +pub struct StreamPendingCountReply { + /// An array of structs containing information about + /// message IDs yet to be acknowledged by various consumers, + /// time since last ack, and total number of acks by that consumer. + pub ids: Vec, +} + +/// Reply type used with [`xinfo_stream`] command, containing +/// general information about the stream stored at the specified key. +/// +/// The very first and last IDs in the stream are shown, +/// in order to give some sense about what is the stream content. +/// +/// [`xinfo_stream`]: ./trait.StreamCommands.html#method.xinfo_stream +/// +#[derive(Default, Debug, Clone)] +pub struct StreamInfoStreamReply { + /// The last generated ID that may not be the same as the last + /// entry ID in case some entry was deleted. + pub last_generated_id: String, + /// Details about the radix tree representing the stream mostly + /// useful for optimization and debugging tasks. + pub radix_tree_keys: usize, + /// The number of consumer groups associated with the stream. + pub groups: usize, + /// Number of elements of the stream. + pub length: usize, + /// The very first entry in the stream. + pub first_entry: StreamId, + /// The very last entry in the stream. + pub last_entry: StreamId, +} + +/// Reply type used with [`xinfo_consumer`] command, an array of every +/// consumer in a specific consumer group. +/// +/// [`xinfo_consumer`]: ./trait.StreamCommands.html#method.xinfo_consumer +/// +#[derive(Default, Debug, Clone)] +pub struct StreamInfoConsumersReply { + /// An array of every consumer in a specific consumer group. + pub consumers: Vec, +} + +/// Reply type used with [`xinfo_groups`] command. +/// +/// This output represents all the consumer groups associated with +/// the stream. +/// +/// [`xinfo_groups`]: ./trait.StreamCommands.html#method.xinfo_groups +/// +#[derive(Default, Debug, Clone)] +pub struct StreamInfoGroupsReply { + /// All the consumer groups associated with the stream. + pub groups: Vec, +} + +/// A consumer parsed from [`xinfo_consumers`] command. +/// +/// [`xinfo_consumers`]: ./trait.StreamCommands.html#method.xinfo_consumers +/// +#[derive(Default, Debug, Clone)] +pub struct StreamInfoConsumer { + /// Name of the consumer group. + pub name: String, + /// Number of pending messages for this specific consumer. + pub pending: usize, + /// This consumer's idle time in milliseconds. + pub idle: usize, +} + +/// A group parsed from [`xinfo_groups`] command. +/// +/// [`xinfo_groups`]: ./trait.StreamCommands.html#method.xinfo_groups +/// +#[derive(Default, Debug, Clone)] +pub struct StreamInfoGroup { + /// The group name. + pub name: String, + /// Number of consumers known in the group. + pub consumers: usize, + /// Number of pending messages (delivered but not yet acknowledged) in the group. + pub pending: usize, + /// Last ID delivered to this group. + pub last_delivered_id: String, +} + +/// Represents a pending message parsed from `xpending` methods. +#[derive(Default, Debug, Clone)] +pub struct StreamPendingId { + /// The ID of the message. + pub id: String, + /// The name of the consumer that fetched the message and has + /// still to acknowledge it. We call it the current owner + /// of the message. + pub consumer: String, + /// The number of milliseconds that elapsed since the + /// last time this message was delivered to this consumer. + pub last_delivered_ms: usize, + /// The number of times this message was delivered. + pub times_delivered: usize, +} + +/// Represents a stream `key` and its `id`'s parsed from `xread` methods. +#[derive(Default, Debug, Clone)] +pub struct StreamKey { + /// The stream `key`. + pub key: String, + /// The parsed stream `id`'s. + pub ids: Vec, +} + +impl StreamKey { + /// Return only the stream `id`'s without their data. + pub fn just_ids(&self) -> Vec<&String> { + self.ids.iter().map(|msg| &msg.id).collect::>() + } +} + +/// Represents a stream `id` and its field/values as a `HashMap` +#[derive(Default, Debug, Clone)] +pub struct StreamId { + /// The stream `id` (entry ID) of this particular message. + pub id: String, + /// All fields in this message, associated with their respective values. + pub map: HashMap, +} + +impl StreamId { + /// Converts a `Value::Bulk` into a `StreamId`. + pub fn from_bulk_value(v: &Value) -> RedisResult { + let mut stream_id = StreamId::default(); + match *v { + Value::Bulk(ref values) => { + if let Some(v) = values.get(0) { + stream_id.id = from_redis_value(&v)?; + } + if let Some(v) = values.get(1) { + stream_id.map = from_redis_value(&v)?; + } + } + _ => {} + } + + Ok(stream_id) + } + + /// Fetches value of a given field and converts it to the specified + /// type. + pub fn get(&self, key: &str) -> Option { + match self.find(&key) { + Some(ref x) => from_redis_value(*x).ok(), + None => None, + } + } + + /// Fetches the value of a given field without performing any + /// type conversion. + pub fn find(&self, key: &&str) -> Option<&Value> { + self.map.get(*key) + } + + /// Does the message contain a particular field? + pub fn contains_key(&self, key: &&str) -> bool { + self.find(key).is_some() + } + + /// Returns how many field/value pairs exist in this message. + pub fn len(&self) -> usize { + self.map.len() + } +} + +impl FromRedisValue for StreamReadReply { + fn from_redis_value(v: &Value) -> RedisResult { + let rows: Vec>>>> = + from_redis_value(v)?; + let mut reply = StreamReadReply::default(); + for row in &rows { + for (key, entry) in row.iter() { + let mut k = StreamKey::default(); + k.key = key.to_owned(); + for id_row in entry { + let mut i = StreamId::default(); + for (id, map) in id_row.iter() { + i.id = id.to_owned(); + i.map = map.to_owned(); + } + k.ids.push(i); + } + reply.keys.push(k); + } + } + Ok(reply) + } +} + +impl FromRedisValue for StreamRangeReply { + fn from_redis_value(v: &Value) -> RedisResult { + let rows: Vec>> = from_redis_value(v)?; + let mut reply = StreamRangeReply::default(); + for row in &rows { + let mut i = StreamId::default(); + for (id, map) in row.iter() { + i.id = id.to_owned(); + i.map = map.to_owned(); + } + reply.ids.push(i); + } + Ok(reply) + } +} + +impl FromRedisValue for StreamClaimReply { + fn from_redis_value(v: &Value) -> RedisResult { + let rows: Vec>> = from_redis_value(v)?; + let mut reply = StreamClaimReply::default(); + for row in &rows { + let mut i = StreamId::default(); + for (id, map) in row.iter() { + i.id = id.to_owned(); + i.map = map.to_owned(); + } + reply.ids.push(i); + } + Ok(reply) + } +} + +impl FromRedisValue for StreamPendingReply { + fn from_redis_value(v: &Value) -> RedisResult { + let parts: (usize, Option, Option, Vec>) = from_redis_value(v)?; + let count = parts.0.to_owned() as usize; + + if count == 0 { + Ok(StreamPendingReply::Empty) + } else { + let mut result = StreamPendingData::default(); + + let start_id = match parts.1.to_owned() { + Some(start) => Ok(start), + None => Err(Error::new( + ErrorKind::Other, + "IllegalState: Non-zero pending expects start id", + )), + }?; + + let end_id = match parts.2.to_owned() { + Some(end) => Ok(end), + None => Err(Error::new( + ErrorKind::Other, + "IllegalState: Non-zero pending expects end id", + )), + }?; + + result.count = count; + result.start_id = start_id; + result.end_id = end_id; + + for consumer in &parts.3 { + let mut info = StreamInfoConsumer::default(); + info.name = consumer[0].to_owned(); + if let Ok(v) = consumer[1].to_owned().parse::() { + info.pending = v; + } + result.consumers.push(info); + } + + Ok(StreamPendingReply::Data(result)) + } + } +} + +impl FromRedisValue for StreamPendingCountReply { + fn from_redis_value(v: &Value) -> RedisResult { + let parts: Vec> = from_redis_value(v)?; + let mut reply = StreamPendingCountReply::default(); + for row in &parts { + let mut p = StreamPendingId::default(); + p.id = row[0].0.to_owned(); + p.consumer = row[0].1.to_owned(); + p.last_delivered_ms = row[0].2.to_owned(); + p.times_delivered = row[0].3.to_owned(); + reply.ids.push(p); + } + Ok(reply) + } +} + +impl FromRedisValue for StreamInfoStreamReply { + fn from_redis_value(v: &Value) -> RedisResult { + let map: HashMap = from_redis_value(v)?; + let mut reply = StreamInfoStreamReply::default(); + if let Some(v) = &map.get("last-generated-id") { + reply.last_generated_id = from_redis_value(v)?; + } + if let Some(v) = &map.get("radix-tree-nodes") { + reply.radix_tree_keys = from_redis_value(v)?; + } + if let Some(v) = &map.get("groups") { + reply.groups = from_redis_value(v)?; + } + if let Some(v) = &map.get("length") { + reply.length = from_redis_value(v)?; + } + if let Some(v) = &map.get("first-entry") { + reply.first_entry = StreamId::from_bulk_value(v)?; + } + if let Some(v) = &map.get("last-entry") { + reply.last_entry = StreamId::from_bulk_value(v)?; + } + Ok(reply) + } +} + +impl FromRedisValue for StreamInfoConsumersReply { + fn from_redis_value(v: &Value) -> RedisResult { + let consumers: Vec> = from_redis_value(v)?; + let mut reply = StreamInfoConsumersReply::default(); + for map in consumers { + let mut c = StreamInfoConsumer::default(); + if let Some(v) = &map.get("name") { + c.name = from_redis_value(v)?; + } + if let Some(v) = &map.get("pending") { + c.pending = from_redis_value(v)?; + } + if let Some(v) = &map.get("idle") { + c.idle = from_redis_value(v)?; + } + reply.consumers.push(c); + } + + Ok(reply) + } +} + +impl FromRedisValue for StreamInfoGroupsReply { + fn from_redis_value(v: &Value) -> RedisResult { + let groups: Vec> = from_redis_value(v)?; + let mut reply = StreamInfoGroupsReply::default(); + for map in groups { + let mut g = StreamInfoGroup::default(); + if let Some(v) = &map.get("name") { + g.name = from_redis_value(v)?; + } + if let Some(v) = &map.get("pending") { + g.pending = from_redis_value(v)?; + } + if let Some(v) = &map.get("consumers") { + g.consumers = from_redis_value(v)?; + } + if let Some(v) = &map.get("last-delivered-id") { + g.last_delivered_id = from_redis_value(v)?; + } + reply.groups.push(g); + } + Ok(reply) + } +} diff --git a/tests/test_streams.rs b/tests/test_streams.rs new file mode 100644 index 000000000..f160116fd --- /dev/null +++ b/tests/test_streams.rs @@ -0,0 +1,568 @@ +#![cfg(feature = "streams")] + +use redis::streams::*; +use redis::{Commands, Connection, RedisResult, ToRedisArgs}; + +mod support; +use crate::support::*; + +use std::collections::BTreeMap; +use std::str; +use std::thread::sleep; +use std::time::Duration; + +macro_rules! assert_args { + ($value:expr, $($args:expr),+) => { + let args = $value.to_redis_args(); + let strings: Vec<_> = args.iter() + .map(|a| str::from_utf8(a.as_ref()).unwrap()) + .collect(); + assert_eq!(strings, vec![$($args),+]); + } +} + +fn xadd(con: &mut Connection) { + let _: RedisResult = + con.xadd("k1", "1000-0", &[("hello", "world"), ("redis", "streams")]); + let _: RedisResult = con.xadd("k1", "1000-1", &[("hello", "world2")]); + let _: RedisResult = con.xadd("k2", "2000-0", &[("hello", "world")]); + let _: RedisResult = con.xadd("k2", "2000-1", &[("hello", "world2")]); +} + +fn xadd_keyrange(con: &mut Connection, key: &str, start: i32, end: i32) { + for _i in start..end { + let _: RedisResult = con.xadd(key, "*", &[("h", "w")]); + } +} + +#[test] +fn test_cmd_options() { + // Tests the following command option builders.... + // xclaim_options + // xread_options + // maxlen enum + + // test read options + + let empty = StreamClaimOptions::default(); + assert_eq!(ToRedisArgs::to_redis_args(&empty).len(), 0); + + let empty = StreamReadOptions::default(); + assert_eq!(ToRedisArgs::to_redis_args(&empty).len(), 0); + + let opts = StreamClaimOptions::default() + .idle(50) + .time(500) + .retry(3) + .with_force() + .with_justid(); + + assert_args!( + &opts, + "IDLE", + "50", + "TIME", + "500", + "RETRYCOUNT", + "3", + "FORCE", + "JUSTID" + ); + + // test maxlen options + + assert_args!(StreamMaxlen::Aprrox(10), "MAXLEN", "~", "10"); + assert_args!(StreamMaxlen::Equals(10), "MAXLEN", "=", "10"); + + // test read options + + let opts = StreamReadOptions::default() + .noack() + .block(100) + .count(200) + .group("group-name", "consumer-name"); + + assert_args!( + &opts, + "BLOCK", + "100", + "COUNT", + "200", + "NOACK", + "GROUP", + "group-name", + "consumer-name" + ); + + // should skip noack because of missing group(,) + let opts = StreamReadOptions::default().noack().block(100).count(200); + + assert_args!(&opts, "BLOCK", "100", "COUNT", "200"); +} + +#[test] +fn test_assorted_1() { + // Tests the following commands.... + // xadd + // xadd_map (skip this for now) + // xadd_maxlen + // xread + // xlen + + let ctx = TestContext::new(); + let mut con = ctx.connection(); + + xadd(&mut con); + + // smoke test that we get the same id back + let result: RedisResult = con.xadd("k0", "1000-0", &[("x", "y")]); + assert_eq!(result.unwrap(), "1000-0"); + + // xread reply + let reply: StreamReadReply = con.xread(&["k1", "k2", "k3"], &["0", "0", "0"]).unwrap(); + + // verify reply contains 2 keys even though we asked for 3 + assert_eq!(&reply.keys.len(), &2usize); + + // verify first key & first id exist + assert_eq!(&reply.keys[0].key, "k1"); + assert_eq!(&reply.keys[0].ids.len(), &2usize); + assert_eq!(&reply.keys[0].ids[0].id, "1000-0"); + + // lookup the key in StreamId map + let hello: Option = reply.keys[0].ids[0].get("hello"); + assert_eq!(hello, Some("world".to_string())); + + // verify the second key was written + assert_eq!(&reply.keys[1].key, "k2"); + assert_eq!(&reply.keys[1].ids.len(), &2usize); + assert_eq!(&reply.keys[1].ids[0].id, "2000-0"); + + // test xadd_map + let mut map: BTreeMap<&str, &str> = BTreeMap::new(); + map.insert("ab", "cd"); + map.insert("ef", "gh"); + map.insert("ij", "kl"); + let _: RedisResult = con.xadd_map("k3", "3000-0", map); + + let reply: StreamRangeReply = con.xrange_all("k3").unwrap(); + assert_eq!(reply.ids[0].contains_key(&"ab"), true); + assert_eq!(reply.ids[0].contains_key(&"ef"), true); + assert_eq!(reply.ids[0].contains_key(&"ij"), true); + + // test xadd w/ maxlength below... + + // add 100 things to k4 + xadd_keyrange(&mut con, "k4", 0, 100); + + // test xlen.. should have 100 items + let result: RedisResult = con.xlen("k4"); + assert_eq!(result, Ok(100)); + + // test xadd_maxlen + let _: RedisResult = + con.xadd_maxlen("k4", StreamMaxlen::Equals(10), "*", &[("h", "w")]); + let result: RedisResult = con.xlen("k4"); + assert_eq!(result, Ok(10)); +} + +#[test] +fn test_assorted_2() { + // Tests the following commands.... + // xadd + // xinfo_stream + // xinfo_groups + // xinfo_consumer + // xgroup_create + // xgroup_create_mkstream + // xread_options + // xack + // xpending + // xpending_count + // xpending_consumer_count + + let ctx = TestContext::new(); + let mut con = ctx.connection(); + + xadd(&mut con); + + // no key exists... this call breaks the connection pipe for some reason + let reply: RedisResult = con.xinfo_stream("k10"); + assert_eq!(reply.is_err(), true); + + // redo the connection because the above error + con = ctx.connection(); + + // key should exist + let reply: StreamInfoStreamReply = con.xinfo_stream("k1").unwrap(); + assert_eq!(&reply.first_entry.id, "1000-0"); + assert_eq!(&reply.last_entry.id, "1000-1"); + assert_eq!(&reply.last_generated_id, "1000-1"); + + // xgroup create (existing stream) + let result: RedisResult = con.xgroup_create("k1", "g1", "$"); + assert_eq!(result.is_ok(), true); + + // xinfo groups (existing stream) + let result: RedisResult = con.xinfo_groups("k1"); + assert_eq!(result.is_ok(), true); + let reply = result.unwrap(); + assert_eq!(&reply.groups.len(), &1); + assert_eq!(&reply.groups[0].name, &"g1"); + + // test xgroup create w/ mkstream @ 0 + let result: RedisResult = con.xgroup_create_mkstream("k99", "g99", "0"); + assert_eq!(result.is_ok(), true); + + // Since nothing exists on this stream yet, + // it should have the defaults returned by the client + let result: RedisResult = con.xinfo_groups("k99"); + assert_eq!(result.is_ok(), true); + let reply = result.unwrap(); + assert_eq!(&reply.groups.len(), &1); + assert_eq!(&reply.groups[0].name, &"g99"); + assert_eq!(&reply.groups[0].last_delivered_id, &"0-0"); + + // call xadd on k99 just so we can read from it + // using consumer g99 and test xinfo_consumers + let _: RedisResult = con.xadd("k99", "1000-0", &[("a", "b"), ("c", "d")]); + let _: RedisResult = con.xadd("k99", "1000-1", &[("e", "f"), ("g", "h")]); + + // test empty PEL + let empty_reply: StreamPendingReply = con.xpending("k99", "g99").unwrap(); + + assert_eq!(empty_reply.count(), 0); + if let StreamPendingReply::Empty = empty_reply { + // looks good + } else { + panic!("Expected StreamPendingReply::Empty but got Data"); + } + + // passing options w/ group triggers XREADGROUP + // using ID=">" means all undelivered ids + // otherwise, ID="0 | ms-num" means all pending already + // sent to this client + let reply: StreamReadReply = con + .xread_options( + &["k99"], + &[">"], + StreamReadOptions::default().group("g99", "c99"), + ) + .unwrap(); + assert_eq!(reply.keys[0].ids.len(), 2); + + // read xinfo consumers again, should have 2 messages for the c99 consumer + let reply: StreamInfoConsumersReply = con.xinfo_consumers("k99", "g99").unwrap(); + assert_eq!(reply.consumers[0].pending, 2); + + // ack one of these messages + let result: RedisResult = con.xack("k99", "g99", &["1000-0"]); + assert_eq!(result, Ok(1)); + + // get pending messages already seen by this client + // we should only have one now.. + let reply: StreamReadReply = con + .xread_options( + &["k99"], + &["0"], + StreamReadOptions::default().group("g99", "c99"), + ) + .unwrap(); + assert_eq!(reply.keys.len(), 1); + + // we should also have one pending here... + let reply: StreamInfoConsumersReply = con.xinfo_consumers("k99", "g99").unwrap(); + assert_eq!(reply.consumers[0].pending, 1); + + // add more and read so we can test xpending + let _: RedisResult = con.xadd("k99", "1001-0", &[("i", "j"), ("k", "l")]); + let _: RedisResult = con.xadd("k99", "1001-1", &[("m", "n"), ("o", "p")]); + let _: StreamReadReply = con + .xread_options( + &["k99"], + &[">"], + StreamReadOptions::default().group("g99", "c99"), + ) + .unwrap(); + + // call xpending here... + // this has a different reply from what the count variations return + let data_reply: StreamPendingReply = con.xpending("k99", "g99").unwrap(); + + assert_eq!(data_reply.count(), 3); + + if let StreamPendingReply::Data(data) = data_reply { + assert_eq!(data.start_id, "1000-1"); + assert_eq!(data.end_id, "1001-1"); + assert_eq!(data.consumers.len(), 1); + assert_eq!(data.consumers[0].name, "c99"); + } else { + panic!("Expected StreamPendingReply::Data but got Empty"); + } + + // both count variations have the same reply types + let reply: StreamPendingCountReply = con.xpending_count("k99", "g99", "-", "+", 10).unwrap(); + assert_eq!(reply.ids.len(), 3); + + let reply: StreamPendingCountReply = con + .xpending_consumer_count("k99", "g99", "-", "+", 10, "c99") + .unwrap(); + assert_eq!(reply.ids.len(), 3); +} + +#[test] +fn test_xadd_maxlen_map() { + let ctx = TestContext::new(); + let mut con = ctx.connection(); + + for i in 0..10 { + let mut map: BTreeMap<&str, &str> = BTreeMap::new(); + let idx = i.to_string(); + map.insert("idx", &idx); + let _: RedisResult = + con.xadd_maxlen_map("maxlen_map", StreamMaxlen::Equals(3), "*", map); + } + + let result: RedisResult = con.xlen("maxlen_map"); + assert_eq!(result, Ok(3)); + let reply: StreamRangeReply = con.xrange_all("maxlen_map").unwrap(); + + assert_eq!(reply.ids[0].get("idx"), Some("7".to_string())); + assert_eq!(reply.ids[1].get("idx"), Some("8".to_string())); + assert_eq!(reply.ids[2].get("idx"), Some("9".to_string())); +} + +#[test] +fn test_xclaim() { + // Tests the following commands.... + // xclaim + // xclaim_options + let ctx = TestContext::new(); + let mut con = ctx.connection(); + + // xclaim test basic idea: + // 1. we need to test adding messages to a group + // 2. then xreadgroup needs to define a consumer and read pending + // messages without acking them + // 3. then we need to sleep 5ms and call xpending + // 4. from here we should be able to claim message + // past the idle time and read them from a different consumer + + // create the group + let result: RedisResult = con.xgroup_create_mkstream("k1", "g1", "$"); + assert_eq!(result.is_ok(), true); + + // add some keys + xadd_keyrange(&mut con, "k1", 0, 10); + + // read the pending items for this key & group + let reply: StreamReadReply = con + .xread_options( + &["k1"], + &[">"], + StreamReadOptions::default().group("g1", "c1"), + ) + .unwrap(); + // verify we have 10 ids + assert_eq!(reply.keys[0].ids.len(), 10); + + // save this StreamId for later + let claim = &reply.keys[0].ids[0]; + let _claim_1 = &reply.keys[0].ids[1]; + let claim_justids = &reply.keys[0].just_ids(); + + // sleep for 5ms + sleep(Duration::from_millis(5)); + + // grab this id if > 4ms + let reply: StreamClaimReply = con + .xclaim("k1", "g1", "c2", 4, &[claim.id.clone()]) + .unwrap(); + assert_eq!(reply.ids.len(), 1); + assert_eq!(reply.ids[0].id, claim.id); + + // grab all pending ids for this key... + // we should 9 in c1 and 1 in c2 + let reply: StreamPendingReply = con.xpending("k1", "g1").unwrap(); + if let StreamPendingReply::Data(data) = reply { + assert_eq!(data.consumers[0].name, "c1"); + assert_eq!(data.consumers[0].pending, 9); + assert_eq!(data.consumers[1].name, "c2"); + assert_eq!(data.consumers[1].pending, 1); + } + + // sleep for 5ms + sleep(Duration::from_millis(5)); + + // lets test some of the xclaim_options + // call force on the same claim.id + let _: StreamClaimReply = con + .xclaim_options( + "k1", + "g1", + "c3", + 4, + &[claim.id.clone()], + StreamClaimOptions::default().with_force(), + ) + .unwrap(); + + let reply: StreamPendingReply = con.xpending("k1", "g1").unwrap(); + // we should have 9 w/ c1 and 1 w/ c3 now + if let StreamPendingReply::Data(data) = reply { + assert_eq!(data.consumers[1].name, "c3"); + assert_eq!(data.consumers[1].pending, 1); + } + + // sleep for 5ms + sleep(Duration::from_millis(5)); + + // claim and only return JUSTID + let claimed: Vec = con + .xclaim_options( + "k1", + "g1", + "c5", + 4, + &claim_justids, + StreamClaimOptions::default().with_force().with_justid(), + ) + .unwrap(); + // we just claimed the original 10 ids + // and only returned the ids + assert_eq!(claimed.len(), 10); +} + +#[test] +fn test_xdel() { + // Tests the following commands.... + // xdel + let ctx = TestContext::new(); + let mut con = ctx.connection(); + + // add some keys + xadd(&mut con); + + // delete the first stream item for this key + let result: RedisResult = con.xdel("k1", &["1000-0"]); + // returns the number of items deleted + assert_eq!(result, Ok(1)); + + let result: RedisResult = con.xdel("k2", &["2000-0", "2000-1", "2000-2"]); + // should equal 2 since the last id doesn't exist + assert_eq!(result, Ok(2)); +} + +#[test] +fn test_xtrim() { + // Tests the following commands.... + // xtrim + let ctx = TestContext::new(); + let mut con = ctx.connection(); + + // add some keys + xadd_keyrange(&mut con, "k1", 0, 100); + + // trim key to 50 + // returns the number of items remaining in the stream + let result: RedisResult = con.xtrim("k1", StreamMaxlen::Equals(50)); + assert_eq!(result, Ok(50)); + // we should end up with 40 after this call + let result: RedisResult = con.xtrim("k1", StreamMaxlen::Equals(10)); + assert_eq!(result, Ok(40)); +} + +#[test] +fn test_xgroup() { + // Tests the following commands.... + // xgroup_create_mkstream + // xgroup_destroy + // xgroup_delconsumer + + let ctx = TestContext::new(); + let mut con = ctx.connection(); + + // test xgroup create w/ mkstream @ 0 + let result: RedisResult = con.xgroup_create_mkstream("k1", "g1", "0"); + assert_eq!(result.is_ok(), true); + + // destroy this new stream group + let result: RedisResult = con.xgroup_destroy("k1", "g1"); + assert_eq!(result, Ok(1)); + + // add some keys + xadd(&mut con); + + // create the group again using an existing stream + let result: RedisResult = con.xgroup_create("k1", "g1", "0"); + assert_eq!(result.is_ok(), true); + + // read from the group so we can register the consumer + let reply: StreamReadReply = con + .xread_options( + &["k1"], + &[">"], + StreamReadOptions::default().group("g1", "c1"), + ) + .unwrap(); + assert_eq!(reply.keys[0].ids.len(), 2); + + let result: RedisResult = con.xgroup_delconsumer("k1", "g1", "c1"); + // returns the number of pending message this client had open + assert_eq!(result, Ok(2)); + + let result: RedisResult = con.xgroup_destroy("k1", "g1"); + assert_eq!(result, Ok(1)); +} + +#[test] +fn test_xrange() { + // Tests the following commands.... + // xrange (-/+ variations) + // xrange_all + // xrange_count + + let ctx = TestContext::new(); + let mut con = ctx.connection(); + + xadd(&mut con); + + // xrange replies + let reply: StreamRangeReply = con.xrange_all("k1").unwrap(); + assert_eq!(reply.ids.len(), 2); + + let reply: StreamRangeReply = con.xrange("k1", "1000-1", "+").unwrap(); + assert_eq!(reply.ids.len(), 1); + + let reply: StreamRangeReply = con.xrange("k1", "-", "1000-0").unwrap(); + assert_eq!(reply.ids.len(), 1); + + let reply: StreamRangeReply = con.xrange_count("k1", "-", "+", 1).unwrap(); + assert_eq!(reply.ids.len(), 1); +} + +#[test] +fn test_xrevrange() { + // Tests the following commands.... + // xrevrange (+/- variations) + // xrevrange_all + // xrevrange_count + + let ctx = TestContext::new(); + let mut con = ctx.connection(); + + xadd(&mut con); + + // xrange replies + let reply: StreamRangeReply = con.xrevrange_all("k1").unwrap(); + assert_eq!(reply.ids.len(), 2); + + let reply: StreamRangeReply = con.xrevrange("k1", "1000-1", "-").unwrap(); + assert_eq!(reply.ids.len(), 2); + + let reply: StreamRangeReply = con.xrevrange("k1", "+", "1000-1").unwrap(); + assert_eq!(reply.ids.len(), 1); + + let reply: StreamRangeReply = con.xrevrange_count("k1", "+", "-", 1).unwrap(); + assert_eq!(reply.ids.len(), 1); +}