diff --git a/.vscode/settings.json b/.vscode/settings.json index 111ee693..608e2d66 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,6 @@ { - "rust-analyzer.cargo.target": "i686-pc-windows-msvc" + "rust-analyzer.cargo.target": "i686-pc-windows-msvc", + "rust-analyzer.linkedProjects": [ + ".\\Cargo.toml" + ] } diff --git a/Cargo.lock b/Cargo.lock index d3f5ab52..e912deff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,17 +61,6 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" -[[package]] -name = "async-trait" -version = "0.1.56" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96cf8829f67d2eab0b2dfa42c5d0ef737e0724e4a82b01b3e292456202b19716" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.98", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -530,12 +519,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dtoa" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" - [[package]] name = "either" version = "1.7.0" @@ -864,7 +847,7 @@ checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ "bytes", "fnv", - "itoa 1.0.2", + "itoa", ] [[package]] @@ -905,7 +888,7 @@ dependencies = [ "http-body", "httparse", "httpdate", - "itoa 1.0.2", + "itoa", "pin-project-lite", "socket2 0.4.4", "tokio", @@ -1004,12 +987,6 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" -[[package]] -name = "itoa" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" - [[package]] name = "itoa" version = "1.0.2" @@ -1332,7 +1309,7 @@ dependencies = [ "saturating", "serde", "serde_json", - "sha1 0.10.1", + "sha1", "sha2", "smallvec", "subprocess", @@ -1866,16 +1843,15 @@ dependencies = [ [[package]] name = "redis" -version = "0.21.5" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a80b5f38d7f5a020856a0e16e40a9cfabf88ae8f0e4c2dcd8a3114c1e470852" +checksum = "3ea8c51b5dc1d8e5fd3350ec8167f464ec0995e79f2e90a075b63371500d557f" dependencies = [ - "async-trait", "combine", - "dtoa", - "itoa 0.4.8", + "itoa", "percent-encoding", - "sha1 0.6.1", + "ryu", + "sha1_smol", "url", ] @@ -2132,7 +2108,7 @@ version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7" dependencies = [ - "itoa 1.0.2", + "itoa", "ryu", "serde", ] @@ -2153,7 +2129,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ "form_urlencoded", - "itoa 1.0.2", + "itoa", "ryu", "serde", ] @@ -2169,15 +2145,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sha1" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1da05c97445caa12d05e848c4a4fcbbea29e748ac28f7e80e9b010392063770" -dependencies = [ - "sha1_smol", -] - [[package]] name = "sha1" version = "0.10.1" @@ -2378,7 +2345,7 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72c91f41dcb2f096c05f0873d667dceec1087ce5bcf984ec8ffb19acddbb3217" dependencies = [ - "itoa 1.0.2", + "itoa", "libc", "num_threads", "time-macros", @@ -2902,7 +2869,7 @@ dependencies = [ "flate2", "hmac", "pbkdf2", - "sha1 0.10.1", + "sha1", "time 0.3.11", "zstd", ] diff --git a/Cargo.toml b/Cargo.toml index 478d5a72..35e1ccb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ png = { version = "0.17", optional = true } image = { version = "0.24", optional = true, default-features = false, features = ["png"] } git2 = { version = "0.17.1", optional = true, default-features = false } noise = { version = "0.8", optional = true } -redis = { version = "0.21", optional = true } +redis = { version = "0.23", optional = true } reqwest = { version = "0.11", optional = true, default-features = false, features = [ "blocking", "rustls-tls", @@ -103,6 +103,7 @@ hash = [ ] pathfinder = ["num", "pathfinding", "serde", "serde_json"] redis_pubsub = ["flume", "redis", "serde", "serde_json"] +redis_reliablequeue = ["flume", "redis", "serde", "serde_json"] unzip = ["zip", "jobs"] worleynoise = ["rand", "rayon"] diff --git a/README.md b/README.md index 91e439bc..5aa28d08 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ Additional features are: * hash: Faster replacement for `md5`, support for SHA-1, SHA-256, and SHA-512. Requires OpenSSL on Linux. * pathfinder: An a* pathfinder used for finding the shortest path in a static node map. Not to be used for a non-static map. * redis_pubsub: Library for sending and receiving messages through Redis. +* redis_reliablequeue: Library for using a reliable queue pattern through Redis. * unzip: Function to download a .zip from a URL and unzip it to a directory. * worleynoise: Function that generates a type of nice looking cellular noise, more expensive than cellularnoise diff --git a/dmsrc/redis-reliablequeue.dm b/dmsrc/redis-reliablequeue.dm new file mode 100644 index 00000000..710eb03e --- /dev/null +++ b/dmsrc/redis-reliablequeue.dm @@ -0,0 +1,38 @@ +/** + * Connects to a given redis server. + * + * Arguments: + * * addr - The address of the server, for example "redis://127.0.0.1/" + */ +#define rustg_redis_connect_rq(addr) RUSTG_CALL(RUST_G, "redis_connect_rq")(addr) +/** + * Disconnects from a previously connected redis server + */ +/proc/rustg_redis_disconnect_rq() return RUSTG_CALL(RUST_G, "redis_disconnect_rq")() +/** + * https://redis.io/commands/lpush/ + * + * Arguments + * * key (string) - The key to use + * * elements (list) - The elements to push, use a list even if there's only one element. + */ +#define rustg_redis_lpush(key, elements) RUSTG_CALL(RUST_G, "redis_lpush")(key, json_encode(elements)) +/** + * https://redis.io/commands/lrange/ + * + * Arguments + * * key (string) - The key to use + * * start (string) - The zero-based index to start retrieving at + * * stop (string) - The zero-based index to stop retrieving at (inclusive) + */ +#define rustg_redis_lrange(key, start, stop) RUSTG_CALL(RUST_G, "redis_lrange")(key, start, stop) +/** + * https://redis.io/commands/lpop/ + * + * Arguments + * * key (string) - The key to use + * * count (string|null) - The amount to pop off the list, pass null to omit (thus just 1) + * + * Note: `count` was added in Redis version 6.2.0 + */ +#define rustg_redis_lpop(key, count) RUSTG_CALL(RUST_G, "redis_lpop")(key, count) diff --git a/src/acreplace.rs b/src/acreplace.rs index 365ce404..e05e511f 100644 --- a/src/acreplace.rs +++ b/src/acreplace.rs @@ -20,7 +20,11 @@ struct AhoCorasickOptions { impl AhoCorasickOptions { fn auto_configure_and_build(&self, patterns: &[String]) -> AhoCorasick { AhoCorasickBuilder::new() - .start_kind(if self.anchored { StartKind::Anchored } else { StartKind::Unanchored }) + .start_kind(if self.anchored { + StartKind::Anchored + } else { + StartKind::Unanchored + }) .ascii_case_insensitive(self.ascii_case_insensitive) .match_kind(self.match_kind) .build(patterns) diff --git a/src/cellularnoise.rs b/src/cellularnoise.rs index fcd2bcb5..ccc17885 100644 --- a/src/cellularnoise.rs +++ b/src/cellularnoise.rs @@ -27,7 +27,6 @@ fn noise_gen( .map(|x| { let mut rng = rand::thread_rng(); (0..height + 3) - .into_iter() .map(|y| { if x == 0 || y == 0 || x == width + 2 || y == height + 2 { return false; @@ -39,12 +38,11 @@ fn noise_gen( .collect::>>(); //then we smoothe it - (0..smoothing_level).into_iter().for_each(|_| { + (0..smoothing_level).for_each(|_| { let replace_vec = (0..width + 3) .into_par_iter() .map(|x| { (0..height + 3) - .into_iter() .map(|y| { if x == 0 || y == 0 || x == width + 2 || y == height + 2 { return false; @@ -77,7 +75,6 @@ fn noise_gen( .into_par_iter() .map(|x| { (1..height + 1) - .into_iter() .map(|y| filled_vec[x][y]) .collect::>() }) diff --git a/src/lib.rs b/src/lib.rs index bbe40556..8b9fa90a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,8 @@ pub mod noise_gen; pub mod pathfinder; #[cfg(feature = "redis_pubsub")] pub mod redis_pubsub; +#[cfg(feature = "redis_reliablequeue")] +pub mod redis_reliablequeue; #[cfg(feature = "sql")] pub mod sql; #[cfg(feature = "time")] diff --git a/src/redis_reliablequeue.rs b/src/redis_reliablequeue.rs new file mode 100644 index 00000000..cc2cca70 --- /dev/null +++ b/src/redis_reliablequeue.rs @@ -0,0 +1,172 @@ +use redis::{Client, Commands, RedisError}; +use std::cell::RefCell; +use std::num::NonZeroUsize; +use std::time::Duration; + +thread_local! { + static REDIS_CLIENT: RefCell> = RefCell::new(None); +} + +fn connect(addr: &str) -> Result<(), RedisError> { + let client = redis::Client::open(addr)?; + let _ = client.get_connection_with_timeout(Duration::from_secs(1))?; + REDIS_CLIENT.with(|cli| cli.replace(Some(client))); + Ok(()) +} + +fn disconnect() { + // Drop the client + REDIS_CLIENT.with(|client| { + client.replace(None); + }); +} + +/// +fn lpush(key: &str, data: serde_json::Value) -> serde_json::Value { + REDIS_CLIENT.with(|client| { + let client_ref = client.borrow(); + if let Some(client) = client_ref.as_ref() { + return match client.get_connection() { + Ok(mut conn) => { + // Need to handle the case of `[{}, {}]` and `{}` + let result = match data { + serde_json::Value::Null => return serde_json::json!( + {"success": false, "content": format!("Failed to perform LPUSH operation: Data sent was null")} + ), + serde_json::Value::Bool(_) | + serde_json::Value::Number(_) | + serde_json::Value::String(_) | + serde_json::Value::Object(_) => conn.lpush::<&str, String, isize>(key, data.to_string()), + serde_json::Value::Array(arr) => conn.lpush::<&str, Vec, isize>(key, map_jvalues_to_strings(&arr)), + }; + return match result { + Ok(res) => serde_json::json!( + {"success": true, "content": res} + ), + Err(e) => serde_json::json!( + {"success": false, "content": format!("Failed to perform LPUSH operation: {e}")} + ), + }; + }, + Err(e) => { + serde_json::json!( + {"success": false, "content": format!("Failed to get connection: {e}")} + ) + } + } + } + serde_json::json!({ + "success": false, "content": "Not Connected" + }) + }) +} + +fn map_jvalues_to_strings(values: &[serde_json::Value]) -> Vec { + values.iter().map(|value| value.to_string()).collect() +} + +/// +fn lrange(key: &str, start: isize, stop: isize) -> serde_json::Value { + REDIS_CLIENT.with(|client| { + let client_ref = client.borrow(); + if let Some(client) = client_ref.as_ref() { + return match client.get_connection() { + Ok(mut conn) => match conn.lrange::<&str, Vec>(key, start, stop) { + Ok(res) => serde_json::json!( + {"success": true, "content": res} + ), + Err(e) => serde_json::json!( + {"success": false, "content": format!("Failed to perform LRANGE operation: {e}")} + ), + }, + Err(e) => + serde_json::json!( + {"success": false, "content": format!("Failed to get connection: {e}")} + ), + } + } + serde_json::json!( + {"success": false, "content": "Not Connected"} + ) + }) +} + +/// +fn lpop(key: &str, count: Option) -> serde_json::Value { + REDIS_CLIENT.with(|client| { + let client_ref = client.borrow(); + if let Some(client) = client_ref.as_ref() { + let mut conn = match client.get_connection() { + Ok(conn) => conn, + Err(e) => { + return serde_json::json!({ + "success": false, "content": format!("Failed to get connection: {e}") + }) + } + }; + // It will return either an Array or a BulkStr per ref + // Yes, this code could be written more tersely but it's more intensive + match count { + None => { + let result = conn.lpop::<&str, String>(key, count); + return match result { + Ok(res) => serde_json::json!({ + "success": true, "content": res + }), + Err(e) => serde_json::json!({ + "success": false, "content": format!("Failed to perform LPOP operation: {e}") + }), + }; + } + Some(_) => { + let result = conn.lpop::<&str, Vec>(key, count); + return match result { + Ok(res) => serde_json::json!({ + "success": true, "content": res + }), + Err(e) => serde_json::json!({ + "success": false, "content": format!("Failed to perform LPOP operation: {e}") + }), + }; + } + }; + } + serde_json::json!({ + "success": false, "content": "Not Connected" + }) + }) +} + +byond_fn!(fn redis_connect_rq(addr) { + connect(addr).err().map(|e| e.to_string()) +}); + +byond_fn!( + fn redis_disconnect_rq() { + disconnect(); + Some("") + } +); + +byond_fn!(fn redis_lpush(key, elements) { + #[allow(clippy::needless_return)] + return match serde_json::from_str(elements) { + Ok(elem) => Some(lpush(key, elem).to_string()), + Err(e) => Some(serde_json::json!({ + "success": false, "content": format!("Failed to deserialize JSON: {e}") + }).to_string()), + }; +}); + +byond_fn!(fn redis_lrange(key, start, stop) { + Some(lrange(key, start.parse().unwrap_or(0), stop.parse().unwrap_or(-1)).to_string()) +}); + +byond_fn!(fn redis_lpop(key, count) { + let count_parsed = if count.is_empty() { + 0 + } else { + count.parse().unwrap_or(0) + }; + Some(lpop(key, std::num::NonZeroUsize::new(count_parsed)).to_string()) +});