Skip to content

Commit

Permalink
adding producer examples (#240)
Browse files Browse the repository at this point in the history
* adding producer examples

* Update documentation

Signed-off-by: Gabriele Santomaggio <[email protected]>

---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
Co-authored-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
DanielePalaia and Gsantomaggio authored Nov 4, 2024
1 parent 30df99f commit c05e420
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 76 deletions.
94 changes: 59 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<br/>
<div align="center">
<strong>
A work-in-progress Rust client for RabbitMQ Streams
A Rust client for RabbitMQ Streams
</strong>
</div>

Expand All @@ -14,7 +14,7 @@
<img src="https://github.com/rabbitmq/rabbitmq-stream-rust-client/workflows/Tests/badge.svg"
alt="Tests status" />
</a>

<a href="https://crates.io/crates/rabbitmq-stream-client">
<img src="https://img.shields.io/crates/d/rabbitmq-stream-client.svg?style=flat-square"
alt="Download" />
Expand All @@ -32,39 +32,53 @@
<a href="https://codecov.io/gh/rabbitmq/rabbitmq-stream-rust-client">
<img src="https://codecov.io/gh/rabbitmq/rabbitmq-stream-rust-client/branch/main/graph/badge.svg?token=2DHIQ20BDE" alt="codecov"/>
</a>
</div>


Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide provides comprehensive information on installation, usage, and examples.

</div>
## Table of Contents
1. [Introduction](#introduction)
2. [Installation](#installation)
3. [Getting Started](#getting-started)
4. [Usage](#usage)
- [Publishing Messages](#publishing-messages)
- [Consuming Messages](#consuming-messages)
- [Super Stream](#super-stream)
5. [Examples](#examples)
6. [Development](#development)
- [Compiling](#Compiling)
- [Running Tests](#running-tests)
- [Running Benchmarks](#running-benchmarks)
- [Contributing](#contributing)
- [License](#license)

## Introduction

# RabbitMQ Stream Client
The RabbitMQ Stream Rust Client is a library designed for integrating Rust applications with RabbitMQ streams efficiently. It supports high throughput and low latency message streaming.

This is a Rust client library for working with [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams).
## Installation

### Installation

Install from [crates.io](https://crates.io/)
Install from [crates.io](https://crates.io/crates/rabbitmq-stream-client)

```toml
[dependencies]
rabbitmq-stream-client = "*"
```

### Quick Start

The main access point is `Environment`, which is used to connect to a node.
Then run `cargo build `to include it in your project.

#### Example
## Getting Started
This section covers the initial setup and necessary steps to incorporate the RabbitMQ Stream client into your Rust application.

##### Building the environment
Ensure RabbitMQ server with stream support is installed.
The main access point is `Environment`, which is used to connect to a node.

```rust,no_run
use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
```

##### Building the environment with TLS
### Environment with TLS

```rust,no_run
use rabbitmq_stream_client::Environment;
Expand All @@ -86,7 +100,10 @@ let environment = Environment::builder()
.build()
```

##### Building the environment with a load balancer
### Environment with a load balancer


See the [documentation](https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams#with-a-load-balancer) about the stream and load-balancer.

```rust,no_run
use rabbitmq_stream_client::Environment;
Expand All @@ -99,21 +116,16 @@ let environment = Environment::builder()



##### Publishing messages
## Publishing messages

You can publish messages with three different methods:

* `send`: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires. On confirmation a callback is triggered. See the [example](./examples/send_async.rs)
* `batch_send`: synchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. See the [example](./examples/batch_send.rs)
* `send_with_confirm`: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method. See the [example](./examples/send_with_confirm.rs)

```rust,no_run
use rabbitmq_stream_client::{Environment, types::Message};
let environment = Environment::builder().build().await?;
let producer = environment.producer().name("myproducer").build("mystream").await?;
for i in 0..10 {
producer
.send_with_confirm(Message::builder().body(format!("message{}", i)).build())
.await?;
}
producer.close().await?;
```

##### Consuming messages
### Consuming messages

```rust,no_run
use rabbitmq_stream_client::{Environment};
Expand All @@ -136,9 +148,10 @@ sleep(Duration::from_secs(10)).await;
handle.close().await?;
```

### Superstreams

The client supports the superstream functionality.
### Super Stream

The client supports the super-stream functionality.

A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.

Expand All @@ -152,16 +165,21 @@ See the [Super Stream Producer Example:](https://github.com/rabbitmq/rabbitmq-st

See the [Super Stream Consumer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/receive_super_stream.rs)

### Development
### Examples

Refer to the [examples](./examples) directory for detailed code samples illustrating various use cases
like error handling, batch processing, super streams and different ways to send messages.

## Development

#### Compiling
### Compiling

```bash
git clone https://github.com/rabbitmq/rabbitmq-stream-rust-client .
make build
```

#### Running Tests
### Running Tests

To run tests you need to have a running RabbitMQ Stream node with a TLS configuration.
It is mandatory to use `make rabbitmq-server` to create a TLS configuration compatible with the tests.
Expand All @@ -172,9 +190,15 @@ make rabbitmq-server
make test
```

#### Running Benchmarks
### Running Benchmarks

```bash
make rabbitmq-server
make run-benchmark
```

## Contributing
Contributions are welcome! Please read our contributing guide to understand how to submit issues, enhancements, or patches.

## License
This project is licensed under the MIT License. See the LICENSE file for details.
59 changes: 18 additions & 41 deletions examples/batch_send.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
use futures::StreamExt;
use rabbitmq_stream_client::{
types::{ByteCapacity, Message, OffsetSpecification},
types::{ByteCapacity, Message},
Environment,
};
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();

tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
let environment = Environment::builder()
.host("localhost")
.port(5552)
.build()
.await?;

println!("creating batch_send stream");
let _ = environment.delete_stream("batch_send").await;
let message_count = 10;
let messages_to_batch = 100;
let iterations = 10000;
environment
.stream_creator()
.max_length(ByteCapacity::GB(2))
Expand All @@ -29,41 +24,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let producer = environment.producer().build("batch_send").await?;

let mut messages = Vec::with_capacity(message_count);
for i in 0..message_count {
let msg = Message::builder().body(format!("message{}", i)).build();
messages.push(msg);
for _ in 0..iterations {
println!("accumulating messages in buffer");
let mut messages = Vec::with_capacity(messages_to_batch);
for i in 0..messages_to_batch {
let msg = Message::builder().body(format!("message{}", i)).build();
messages.push(msg);
}

println!("sending in batch mode");
producer
.batch_send(messages, |confirmation_status| async move {
println!("Message confirmed with status {:?}", confirmation_status);
})
.await?;
}

producer
.batch_send(messages, |confirmation_status| async move {
info!("Message confirmed with status {:?}", confirmation_status);
})
.await?;

producer.close().await?;

let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build("batch_send")
.await
.unwrap();

for _ in 0..message_count {
let delivery = consumer.next().await.unwrap()?;
info!(
"Got message : {:?} with offset {}",
delivery
.message()
.data()
.map(|data| String::from_utf8(data.to_vec())),
delivery.offset()
);
}

consumer.handle().close().await.unwrap();

environment.delete_stream("batch_send").await?;
Ok(())
}
69 changes: 69 additions & 0 deletions examples/batch_send_with_tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use futures::StreamExt;
use rabbitmq_stream_client::{
types::{ByteCapacity, Message, OffsetSpecification},
Environment,
};
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();

tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
let environment = Environment::builder()
.host("localhost")
.port(5552)
.build()
.await?;

let _ = environment.delete_stream("batch_send").await;
let message_count = 10;
environment
.stream_creator()
.max_length(ByteCapacity::GB(2))
.create("batch_send")
.await?;

let producer = environment.producer().build("batch_send").await?;

let mut messages = Vec::with_capacity(message_count);
for i in 0..message_count {
let msg = Message::builder().body(format!("message{}", i)).build();
messages.push(msg);
}

producer
.batch_send(messages, |confirmation_status| async move {
info!("Message confirmed with status {:?}", confirmation_status);
})
.await?;

producer.close().await?;

let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build("batch_send")
.await
.unwrap();

for _ in 0..message_count {
let delivery = consumer.next().await.unwrap()?;
info!(
"Got message : {:?} with offset {}",
delivery
.message()
.data()
.map(|data| String::from_utf8(data.to_vec())),
delivery.offset()
);
}

consumer.handle().close().await.unwrap();

environment.delete_stream("batch_send").await?;
Ok(())
}
Loading

0 comments on commit c05e420

Please sign in to comment.