Skip to content

Commit

Permalink
Merge pull request #25 from endor/add-max-messages
Browse files Browse the repository at this point in the history
feat: allow specifying the maximum number of messages to fetch
  • Loading branch information
kayleg authored Jan 21, 2023
2 parents 4fe1b28 + e0a6550 commit 5814f27
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 5 deletions.
2 changes: 1 addition & 1 deletion examples/long_lived.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl FromPubSubMessage for UpdatePacket {
fn schedule_pubsub_pull(subscription: Arc<Subscription>) {
task::spawn(async move {
while subscription.client().is_running() {
match subscription.get_messages::<UpdatePacket>().await {
match subscription.get_messages::<UpdatePacket>(100).await {
Ok(messages) => {
for (result, ack_id) in messages {
match result {
Expand Down
2 changes: 1 addition & 1 deletion examples/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() {
};

let order_sub = Arc::new(pubsub.subscribe(config.pubsub_subscription));
match order_sub.clone().get_messages::<UpdatePacket>().await {
match order_sub.clone().get_messages::<UpdatePacket>(100).await {
Ok(packets) => {
for packet in &packets {
println!("Received: {:?}", packet);
Expand Down
2 changes: 1 addition & 1 deletion examples/singleshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() {
};

let subscription = Arc::new(pubsub.subscribe(config.pubsub_subscription));
match subscription.get_messages::<UpdatePacket>().await {
match subscription.get_messages::<UpdatePacket>(100).await {
Ok(messages) => {
for (result, ack_id) in messages {
match result {
Expand Down
2 changes: 1 addition & 1 deletion examples/subscribe_to_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn main() {
println!("Subscribed to topic with: {}", sub.name);
let packets = sub
.clone()
.get_messages::<UpdatePacket>()
.get_messages::<UpdatePacket>(100)
.await
.expect("Error Checking PubSub");

Expand Down
3 changes: 2 additions & 1 deletion src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl Subscription {

pub async fn get_messages<T: FromPubSubMessage>(
&self,
max_messages: i32,
) -> Result<Vec<(Result<T, error::Error>, String)>, error::Error> {
let client = self
.client
Expand All @@ -69,7 +70,7 @@ impl Subscription {
.parse()
.unwrap();

let json = r#"{"maxMessages": 100}"#;
let json = format!("{{\"maxMessages\": {}}}", max_messages);

let mut req = client.request(Method::POST, json);
*req.uri_mut() = uri.clone();
Expand Down

0 comments on commit 5814f27

Please sign in to comment.