Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improving filtering examples + others #242

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 38 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide pro
- [Publishing Messages](#publishing-messages)
- [Consuming Messages](#consuming-messages)
- [Super Stream](#super-stream)
- [Filtering](#filtering)
5. [Examples](#examples)
6. [Development](#development)
- [Compiling](#Compiling)
Expand Down Expand Up @@ -121,35 +122,31 @@ let environment = Environment::builder()
You can publish messages with three different methods:

* `send`: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires. On confirmation a callback is triggered. See the [example](./examples/send_async.rs)
* `batch_send`: synchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. See the [example](./examples/batch_send.rs)
* `batch_send`: asynchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. See the [example](./examples/batch_send.rs)
* `send_with_confirm`: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method. See the [example](./examples/send_with_confirm.rs)


### Consuming messages
## Consuming messages

```rust,no_run
use rabbitmq_stream_client::{Environment};
use futures::StreamExt;
use tokio::task;
use tokio::time::{sleep, Duration};
let environment = Environment::builder().build().await?;
let mut consumer = environment.consumer().build("mystream").await?;
let handle = consumer.handle();
task::spawn(async move {
while let Some(delivery) = consumer.next().await {
let d = delivery.unwrap();
println!("Got message: {:#?} with offset: {}",
d.message().data().map(|data| String::from_utf8(data.to_vec()).unwrap()),
d.offset(),);
}
});
// wait 10 second and then close the consumer
sleep(Duration::from_secs(10)).await;
handle.close().await?;
```
As streams never delete any messages, any consumer can start reading/consuming from any point in the log

See the Consuming section part of the streaming doc for further info (Most of the examples refer to Java but applies for ths library too):

[Consuming messages from a stream](https://www.rabbitmq.com/docs/streams#consuming)

See also the Rust streaming tutorial-2 on how consume messages starting from different positions and how to enable Server-Side Offset Tracking too:

[RabbitMQ Streams - Rust tutorial 2](https://www.rabbitmq.com/tutorials/tutorial-two-rust-stream)

and the relative examples from the tutorials:

[Rust tutorials examples](https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/rust-stream)

See also a simple example here on how to consume from a stream:

[Consuming messages from a stream example](./examples/simple-consume.rs)

### Super Stream
## Super Stream

The client supports the super-stream functionality.

Expand All @@ -161,9 +158,25 @@ You can use SuperStreamProducer and SuperStreamConsumer classes which internally

Have a look to the examples to see on how to work with super streams.

See the [Super Stream Producer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/send_super_stream.rs)
See the [Super Stream Producer Example](./examples/superstreams/send_super_stream.rs)

See the [Super Stream Consumer Example](./examples/superstreams/receive_super_stream.rs)


## Filtering

Filtering is a new streaming feature enabled from RabbitMQ 3.13 based on Bloom filter. RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side. This helps to save network bandwidth when a consuming application needs only a subset of messages.

See the Java documentation for more details (Same concepts apply here):

[Filtering - Java Doc](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#filtering)

See Rust filtering examples:

See the [Producer with filtering Example](./examples/filtering/send_with_filtering.rs)

See the [Consumer with filtering Example](./examples/filtering/receive_with_filtering.rs)

See the [Super Stream Consumer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/receive_super_stream.rs)

### Examples

Expand Down
84 changes: 0 additions & 84 deletions examples/filtering.rs

This file was deleted.

78 changes: 78 additions & 0 deletions examples/filtering/receive_with_filtering.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/* Receives just Messages where filter California is applied.
This is assured by having added to the vector filter_values of FilterConfiguration the value California
and by the post_filter function to skip false positives
*/

use futures::StreamExt;
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::ResponseCode;
use rabbitmq_stream_client::types::{ByteCapacity, OffsetSpecification};
use rabbitmq_stream_client::{Environment, FilterConfiguration};
use std::convert::TryInto;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream = "test_stream_filtering";
let environment = Environment::builder()
.host("localhost")
.port(5552)
.build()
.await?;

let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}

// filter configuration: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
// We are telling the Consumer to ask the server just messages with filter California
// The post_filler is Optional and needed to skip false positives
let filter_configuration = FilterConfiguration::new(vec!["California".to_string()], false)
.post_filter(|message| {
let region: String = message
.application_properties()
.unwrap()
.get("region")
.unwrap()
.clone()
.try_into()
.unwrap();

region == "California".to_string()
});

let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.filter_input(Some(filter_configuration))
.build(stream)
.await
.unwrap();

// Just Messages with filter California will appear
while let Some(delivery) = consumer.next().await {
let d = delivery.unwrap();
println!(
"Got message : {:?} with offset {}",
d.message()
.data()
.map(|data| String::from_utf8(data.to_vec())),
d.offset()
);
}

Ok(())
}
121 changes: 121 additions & 0 deletions examples/filtering/send_with_filtering.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/* Send 100 messages with filter of Region "California" and 100 messages with filter "Texas"
Filters are specified in the application_properties of the messages using the custom field "Region"
in the filter_value_extractor callback
The main thread wait on a condition variable until all the messages have been confirmed */

use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::ResponseCode;
use rabbitmq_stream_client::types::{ByteCapacity, Message};
use rabbitmq_stream_client::Environment;
use std::convert::TryInto;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;

// This callback instruct the Producer on what filter we want to apply
// In this case we are returning the value of the application_property "region" value
fn filter_value_extractor(message: &Message) -> String {
message
.application_properties()
.unwrap()
.get("region")
.unwrap()
.clone()
.try_into()
.unwrap()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let confirmed_messages = Arc::new(AtomicU32::new(0));
let notify_on_send = Arc::new(Notify::new());
let stream = "test_stream_filtering";

let environment = Environment::builder()
.host("localhost")
.port(5552)
.build()
.await?;

let message_count = 200;
let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(&stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}

let mut producer = environment
.producer()
// we are telling the producer to use the callback filter_value_extractor to compute the filter
.filter_value_extractor(filter_value_extractor)
.build("test_stream_filtering")
.await?;

// Sending first 200 messages with filter California
for i in 0..message_count {
let counter = confirmed_messages.clone();
let notifier = notify_on_send.clone();

let msg = Message::builder()
.body(format!("super stream message_{}", i))
.application_properties()
.insert("region", "California")
.message_builder()
.build();

producer
.send(msg, move |_| {
let inner_counter = counter.clone();
let inner_notifier = notifier.clone();
async move {
if inner_counter.fetch_add(1, Ordering::Relaxed) == (message_count * 2) - 1 {
inner_notifier.notify_one();
}
}
})
.await
.unwrap();
}

// Sending 200 messages with filter Texas
for i in 0..message_count {
let counter = confirmed_messages.clone();
let notifier = notify_on_send.clone();
let msg = Message::builder()
.body(format!("super stream message_{}", i))
.application_properties()
.insert("region", "Texas")
.message_builder()
.build();

producer
.send(msg, move |_| {
let inner_counter = counter.clone();
let inner_notifier = notifier.clone();
async move {
if inner_counter.fetch_add(1, Ordering::Relaxed) == (message_count * 2) - 1 {
inner_notifier.notify_one();
}
}
})
.await
.unwrap();
}

notify_on_send.notified().await;
producer.close().await?;

Ok(())
}
Loading
Loading