Skip to content

Commit a244889

Browse files
authored
Merge pull request #596 from fede1024/scanterog/seek-partitions
Change signature for seek_partitions
2 parents e27049f + 020700d commit a244889

6 files changed

+77
-8
lines changed

changelog.md

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
44

55
## Unreleased
66

7+
* **Breaking change.** Change signature for `seek_partitions`. Following
8+
librdkafka, individual partition errors should be reported in the per-partition
9+
`error` field of `TopicPartitionList` elements.
10+
711
## 0.33.0 (2023-06-30)
812

913
* Add interface to specify custom partitioners by extending `ProducerContext`

src/consumer/base_consumer.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,9 @@ where
352352

353353
fn seek_partitions<T: Into<Timeout>>(
354354
&self,
355-
topic_partition_list: &TopicPartitionList,
355+
topic_partition_list: TopicPartitionList,
356356
timeout: T,
357-
) -> KafkaResult<()> {
357+
) -> KafkaResult<TopicPartitionList> {
358358
let ret = unsafe {
359359
RDKafkaError::from_ptr(rdsys::rd_kafka_seek_partitions(
360360
self.client.native_ptr(),
@@ -366,7 +366,7 @@ where
366366
let error = ret.name();
367367
return Err(KafkaError::Seek(error));
368368
}
369-
Ok(())
369+
Ok(topic_partition_list)
370370
}
371371

372372
fn commit(

src/consumer/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -261,11 +261,13 @@ where
261261
/// in the `offset` field of `TopicPartitionListElem`.
262262
/// The offset can be either absolute (>= 0) or a logical offset.
263263
/// Seek should only be performed on already assigned/consumed partitions.
264+
/// Individual partition errors are reported in the per-partition `error` field of
265+
/// `TopicPartitionListElem`.
264266
fn seek_partitions<T: Into<Timeout>>(
265267
&self,
266-
topic_partition_list: &TopicPartitionList,
268+
topic_partition_list: TopicPartitionList,
267269
timeout: T,
268-
) -> KafkaResult<()>;
270+
) -> KafkaResult<TopicPartitionList>;
269271

270272
/// Commits the offset of the specified message. The commit can be sync
271273
/// (blocking), or async. Notice that when a specific offset is committed,

src/consumer/stream_consumer.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -412,9 +412,9 @@ where
412412

413413
fn seek_partitions<T: Into<Timeout>>(
414414
&self,
415-
topic_partition_list: &TopicPartitionList,
415+
topic_partition_list: TopicPartitionList,
416416
timeout: T,
417-
) -> KafkaResult<()> {
417+
) -> KafkaResult<TopicPartitionList> {
418418
self.base.seek_partitions(topic_partition_list, timeout)
419419
}
420420

src/topic_partition_list.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -387,11 +387,12 @@ impl fmt::Debug for TopicPartitionList {
387387
}
388388
write!(
389389
f,
390-
"{}/{}: offset={:?} metadata={:?}",
390+
"{}/{}: offset={:?} metadata={:?}, error={:?}",
391391
elem.topic(),
392392
elem.partition(),
393393
elem.offset(),
394394
elem.metadata(),
395+
elem.error(),
395396
)?;
396397
}
397398
write!(f, "}}")

tests/test_low_consumers.rs

+62
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Test data consumption using low level consumers.
22
33
use std::collections::HashMap;
4+
use std::convert::TryInto;
45
use std::sync::atomic::{AtomicUsize, Ordering};
56
use std::sync::Arc;
67
use std::thread;
@@ -90,6 +91,67 @@ async fn test_produce_consume_seek() {
9091
}
9192
}
9293

94+
// Seeking should allow replaying messages and skipping messages.
95+
#[tokio::test]
96+
async fn test_produce_consume_seek_partitions() {
97+
let _r = env_logger::try_init();
98+
99+
let topic_name = rand_test_topic();
100+
populate_topic(&topic_name, 30, &value_fn, &key_fn, None, None).await;
101+
102+
let consumer = create_base_consumer(&rand_test_group(), None);
103+
consumer.subscribe(&[topic_name.as_str()]).unwrap();
104+
105+
let mut partition_offset_map = HashMap::new();
106+
for message in consumer.iter().take(30) {
107+
match message {
108+
Ok(m) => {
109+
let offset = partition_offset_map.entry(m.partition()).or_insert(0);
110+
assert_eq!(m.offset(), *offset);
111+
*offset += 1;
112+
}
113+
Err(e) => panic!("Error receiving message: {:?}", e),
114+
}
115+
}
116+
117+
let mut tpl = TopicPartitionList::new();
118+
tpl.add_partition_offset(&topic_name, 0, Offset::Beginning)
119+
.unwrap();
120+
tpl.add_partition_offset(&topic_name, 1, Offset::End)
121+
.unwrap();
122+
tpl.add_partition_offset(&topic_name, 2, Offset::Offset(2))
123+
.unwrap();
124+
125+
let r_tpl = consumer.seek_partitions(tpl, None).unwrap();
126+
assert_eq!(r_tpl.elements().len(), 3);
127+
for tpe in r_tpl.elements().iter() {
128+
assert!(tpe.error().is_ok());
129+
}
130+
131+
let msg_cnt_p0 = partition_offset_map.get(&0).unwrap();
132+
let msg_cnt_p2 = partition_offset_map.get(&2).unwrap();
133+
let total_msgs_to_read = msg_cnt_p0 + (msg_cnt_p2 - 2);
134+
let mut poffset_map = HashMap::new();
135+
for message in consumer.iter().take(total_msgs_to_read.try_into().unwrap()) {
136+
match message {
137+
Ok(m) => {
138+
let offset = poffset_map.entry(m.partition()).or_insert(0);
139+
if m.partition() == 0 {
140+
assert_eq!(m.offset(), *offset);
141+
} else if m.partition() == 2 {
142+
assert_eq!(m.offset(), *offset + 2);
143+
} else if m.partition() == 1 {
144+
panic!("Unexpected message from partition 1")
145+
}
146+
*offset += 1;
147+
}
148+
Err(e) => panic!("Error receiving message: {:?}", e),
149+
}
150+
}
151+
assert_eq!(msg_cnt_p0, poffset_map.get(&0).unwrap());
152+
assert_eq!(msg_cnt_p2 - 2, *poffset_map.get(&2).unwrap());
153+
}
154+
93155
// All produced messages should be consumed.
94156
#[tokio::test]
95157
async fn test_produce_consume_iter() {

0 commit comments

Comments
 (0)