Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update from upstream c6d9a65a6df8c43fdf93b42bb276b6b7d2ba6c23 #10

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
46550f7
Allow users to set make when using cmake
joelwachsler Aug 3, 2023
eaee552
Bump librdkafka to 2.3.0.
davidblewett Nov 3, 2023
2990bc1
Add rust-version, so that our MSRV is enforced by cargo:
davidblewett Nov 3, 2023
4753bf6
Derives `serde::Serialize` on `Statistics` (#616)
icdevin Nov 7, 2023
e439b6e
Release rdkafka-sys v4.7.0+2.3.0
scanterog Nov 7, 2023
05de3f7
Release v0.35.0
scanterog Nov 7, 2023
5fbed45
Update release notes
scanterog Nov 7, 2023
e57447f
Move to Event-based API
scanterog Aug 11, 2023
75ff5d8
Adapt the StreamConsumer to poll the underlying BaseConsumer
scanterog Oct 17, 2023
8a42919
Pass arc by value rather than reference and fix generic type.
davidblewett Oct 18, 2023
2d82d40
Refactor to use references and lifetimes rather than Arc.
davidblewett Oct 20, 2023
abc01a8
Work on supporting StreamConsumer via lifetimes instead of Arc.
davidblewett Oct 22, 2023
374855b
Use Arc for events in BorrowMessage
scanterog Oct 24, 2023
d49db5b
Adapt producer Flush to the Event API semantics
scanterog Oct 24, 2023
53cebf9
Explain why the TPL need to be manuallyDrop on the consumer events ha…
scanterog Oct 25, 2023
d5d2bc0
Add comment for no-op method used on RDKafkaMessage impl of the Kafka…
scanterog Oct 25, 2023
ae6e652
Update doc comment for BorrowedMessage::from_dr_event
scanterog Oct 25, 2023
1e5285c
Replace poll with flush on baseProducer drop
scanterog Oct 25, 2023
3e5e24b
StreamConsumer Stream impl fixes for the event API
scanterog Oct 26, 2023
3255470
Consumer needs to read from earliest otherwise consumer will never re…
scanterog Oct 26, 2023
9a2d7d5
Poll should not return None if timeout has not been reached
scanterog Oct 26, 2023
7b20a5d
Cargo clippy
scanterog Oct 26, 2023
bf3914f
Propagate errors for the consumer
scanterog Oct 27, 2023
990a6b2
Adapt commit_transaction to the event api
scanterog Oct 27, 2023
8ed896b
Adapt consumer close to the event api
scanterog Oct 30, 2023
207dd38
Allow creating a consumer without group.id
scanterog Oct 30, 2023
0bae0d9
Do not panic on transient errors on test_consume_partition_order
scanterog Oct 31, 2023
2570ae2
Expose a close_queue and closed methods
scanterog Oct 30, 2023
004ad40
Use closed and close_queue methods on drop
scanterog Nov 6, 2023
51b24cd
Propagate fatal errors
scanterog Nov 6, 2023
8437073
Fix op timeout computation logic on poll_queue
scanterog Nov 7, 2023
c356f05
Release v0.36.0.
davidblewett Nov 8, 2023
ecbc83a
Use a short timeout instead of now_or_never.
davidblewett Jan 11, 2024
f360f57
Disable valgrind for now, and start up kafka/zk before starting building
davidblewett Jan 11, 2024
ddfc3b4
Avoid topic pollution by prefixing with test name.
davidblewett Jan 11, 2024
f572dcb
Return back to the caller on rebalance events
scanterog Jan 4, 2024
e301b5f
Release v0.36.1.
davidblewett Jan 11, 2024
f0543b6
Add Kafka 3.6 to the integration test matrix.
davidblewett Jan 11, 2024
12d05df
Release v0.36.2.
davidblewett Jan 16, 2024
7834a25
Use `CStr::to_string_lossy` in Base{Consumer,Producer}
Swatinem Dec 19, 2023
8d74526
Fix panic on getting config value from NativeConfig
Magister Jun 13, 2023
b97754b
Remove nul chars from string before parsing to numerical values
KowalczykBartek May 29, 2024
b3a9d35
Fix integration tests
Aug 4, 2024
c2fb318
Fix clippy warnings
fede1024 Aug 4, 2024
c613b37
Fix clone clippy warning
fede1024 Aug 4, 2024
05ba0a8
Impl ToBytes for [u8; N] with generic const
cyborg42 Jul 10, 2024
aaf72de
fix: check for non-zero count before calling slice::from_raw_parts_mu…
hadronzoo Aug 6, 2024
75737f3
Enable the development of custom consumers
manifest Jun 2, 2024
2b05864
Avoid realloc when creating C-style strings
mlowicki Aug 21, 2024
3209fcc
Add support for deleting records to admin client
benesch Aug 31, 2024
cd91574
Remove obsolete utilities
benesch Aug 31, 2024
7e83013
Update README.md
Abdullahsab3 Sep 14, 2024
c01975f
Fix StreamConsumer wakeup races
trtt Mar 30, 2024
e8f786d
to_string_lossy
trtt Aug 29, 2024
c709cee
use from
trtt Aug 29, 2024
bc2cf72
Point submodule to our fork's datadog/main branch.
davidblewett Aug 1, 2024
f86a51b
Update librdkafka to 2.4.0+patches.
davidblewett Aug 1, 2024
78f3361
Bump to librdkafka 2.5.0+patches
davidblewett Aug 22, 2024
0dcce2b
optimise OwnedHeaders::insert
mlowicki Aug 26, 2024
7a68103
Merge branch 'datadog/main' into davidblewett/update-from-upstream-c6…
davidblewett Sep 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ on:
branches: [master]

env:
rust_version: 1.61.0
rust_version: 1.70.0

jobs:
lint:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@b44cb146d03e8d870c57ab64b80f04586349ca5d
- uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ env.rust_version }}
components: rustfmt, clippy
- run: cargo fmt -- --check
- run: cargo clippy -- -Dwarnings
- run: cargo clippy --tests -- -Dwarnings
- run: cargo test --doc

check:
Expand All @@ -38,7 +39,8 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@b44cb146d03e8d870c57ab64b80f04586349ca5d
- uses: lukka/get-cmake@latest
- uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ env.rust_version }}
- run: cargo build --all-targets --verbose --features "${{ matrix.features }}"
Expand All @@ -50,41 +52,44 @@ jobs:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@b44cb146d03e8d870c57ab64b80f04586349ca5d
- uses: dtolnay/rust-toolchain@stable
with:
# The version of this toolchain doesn't matter much. It's only used to
# generate the minimal-versions lockfile, not to actually run `cargo
# check`.
toolchain: nightly
components: rustfmt, clippy
- uses: dtolnay/rust-toolchain@b44cb146d03e8d870c57ab64b80f04586349ca5d
- uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ env.rust_version }}
- run: rustup default ${{ env.rust_version }}
- run: cargo +nightly -Z minimal-versions generate-lockfile
- run: cargo check
# Default features and features that require optional dependencies should be
# explicitly checked.
- run: cargo check --features libz,tokio,tracing

test:
strategy:
fail-fast: false
# The test suite doesn't support concurrent runs.
max-parallel: 1
matrix:
include:
- confluent-version: 7.7.0
kafka-version: 3.7
- confluent-version: 7.5.1
kafka-version: 3.6
- confluent-version: 7.5.1
kafka-version: 3.5
- confluent-version: 5.3.1
kafka-version: 2.3
- confluent-version: 5.0.3
kafka-version: 2.0
- confluent-version: 4.1.3
kafka-version: 1.1
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@b44cb146d03e8d870c57ab64b80f04586349ca5d
- uses: lukka/get-cmake@latest
- uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ env.rust_version }}
- run: sudo apt-get update
- run: sudo apt-get install -qy valgrind
# - run: sudo apt-get update
# - run: sudo apt-get install -qy valgrind # Valgrind currently disabled in testing
- run: ./test_suite.sh
env:
CONFLUENT_VERSION: ${{ matrix.confluent-version }}
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ keywords = ["kafka", "rdkafka"]
categories = ["api-bindings"]
edition = "2018"
exclude = ["Cargo.lock"]
rust-version = "1.61"
rust-version = "1.70"

[workspace]
members = ["rdkafka-sys"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ re-exported as rdkafka features.

### Minimum supported Rust version (MSRV)

The current minimum supported Rust version (MSRV) is 1.61.0. Note that
The current minimum supported Rust version (MSRV) is 1.70.0. Note that
bumping the MSRV is not considered a breaking change. Any release of
rust-rdkafka may bump the MSRV.

Expand Down
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).

## Unreleased

* Update MSRV to 1.70
* Remove testign for old Kafka versions (before 3.0). Add tests for 3.7.
* Fix test dependency on docker compose.

## 0.36.2 (2024-01-16)

* Update `BaseConsumer::poll` to return `None` when handling rebalance
Expand Down
2 changes: 0 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3'

services:
kafka:
image: confluentinc/cp-kafka:${CONFLUENT_VERSION:-7.5.1}
Expand Down
2 changes: 1 addition & 1 deletion rdkafka-sys/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Native bindings to the librdkafka library"
keywords = ["kafka", "rdkafka"]
categories = ["external-ffi-bindings"]
edition = "2018"
rust-version = "1.61"
rust-version = "1.70"

[dependencies]
num_enum = "0.5.0"
Expand Down
2 changes: 1 addition & 1 deletion rdkafka-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn main() {
// Ensure that we are in the right directory
let rdkafkasys_root = Path::new("rdkafka-sys");
if rdkafkasys_root.exists() {
assert!(env::set_current_dir(&rdkafkasys_root).is_ok());
assert!(env::set_current_dir(rdkafkasys_root).is_ok());
}
if !Path::new("librdkafka/LICENSE").exists() {
eprintln!("Setting up submodules");
Expand Down
2 changes: 1 addition & 1 deletion rdkafka-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ extern crate sasl2_sys;
#[cfg(feature = "libz-sys")]
extern crate libz_sys;

#[cfg(any(feature = "curl-sys", feature = "curl-sys/static-curl"))]
#[cfg(any(feature = "curl-sys", feature = "curl-static"))]
extern crate curl_sys;

#[cfg(feature = "zstd-sys")]
Expand Down
3 changes: 3 additions & 0 deletions rdkafka-sys/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub type RDKafkaDeleteGroup = bindings::rd_kafka_DeleteGroup_t;
/// Native rdkafka new partitions object.
pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t;

/// Native rdkafka delete records object.
pub type RDKafkaDeleteRecords = bindings::rd_kafka_DeleteRecords_t;

/// Native rdkafka config resource.
pub type RDKafkaConfigResource = bindings::rd_kafka_ConfigResource_t;

Expand Down
85 changes: 85 additions & 0 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::log::{trace, warn};
use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
use crate::TopicPartitionList;

//
// ********** ADMIN CLIENT **********
Expand Down Expand Up @@ -218,6 +219,53 @@ impl<C: ClientContext> AdminClient<C> {
Ok(rx)
}

/// Deletes records from a topic.
///
/// The provided `offsets` is a topic partition list specifying which
/// records to delete from a list of topic partitions. For each entry in the
/// list, the messages at offsets before the specified offsets (exclusive)
/// in the specified partition will be deleted. Use offset [`Offset::End`]
/// to delete all records in the partition.
///
/// Returns a topic partition list describing the result of the deletion. If
/// the operation succeeded for a partition, the offset for that partition
/// will be set to the post-deletion low-water mark for that partition. If
/// the operation failed for a partition, there will be an error for that
/// partition's entry in the list.
pub fn delete_records(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<TopicPartitionList>> {
match self.delete_records_inner(offsets, opts) {
Ok(rx) => Either::Left(DeleteRecordsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}

fn delete_records_inner(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
let mut err_buf = ErrBuf::new();
let delete_records = unsafe {
NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr()))
}
.ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_DeleteRecords(
self.client.native_ptr(),
&mut delete_records.ptr(),
1,
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}

/// Retrieves the configuration parameters for the specified resources.
///
/// Note that while the API supports describing multiple configurations at
Expand Down Expand Up @@ -950,6 +998,43 @@ impl Future for CreatePartitionsFuture {
}
}

//
// Delete records handling
//

type NativeDeleteRecords = NativePtr<RDKafkaDeleteRecords>;

unsafe impl KafkaDrop for RDKafkaDeleteRecords {
const TYPE: &'static str = "delete records";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy;
}

struct DeleteRecordsFuture {
rx: oneshot::Receiver<NativeEvent>,
}

impl Future for DeleteRecordsFuture {
type Output = KafkaResult<TopicPartitionList>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"delete records request received response of incorrect type ({})",
typ
))));
}
let tpl = unsafe {
let tpl = rdsys::rd_kafka_DeleteRecords_result_offsets(res);
TopicPartitionList::from_ptr(rdsys::rd_kafka_topic_partition_list_copy(tpl))
};
Poll::Ready(Ok(tpl))
}
}

//
// Describe configs handling
//
Expand Down
38 changes: 32 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ impl NativeClient {
}
}

pub(crate) enum EventPollResult<T> {
None,
EventConsumed,
Event(T),
}

impl<T> From<EventPollResult<T>> for Option<T> {
fn from(val: EventPollResult<T>) -> Self {
match val {
EventPollResult::None | EventPollResult::EventConsumed => None,
EventPollResult::Event(evt) => Some(evt),
}
}
}

/// A low-level rdkafka client.
///
/// This type is the basis of the consumers and producers in the [`consumer`]
Expand Down Expand Up @@ -278,31 +293,42 @@ impl<C: ClientContext> Client<C> {
&self.context
}

pub(crate) fn poll_event(&self, queue: &NativeQueue, timeout: Timeout) -> Option<NativeEvent> {
pub(crate) fn poll_event(
&self,
queue: &NativeQueue,
timeout: Timeout,
) -> EventPollResult<NativeEvent> {
let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) };
if let Some(ev) = event {
let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) };
match evtype {
rdsys::RD_KAFKA_EVENT_LOG => self.handle_log_event(ev.ptr()),
rdsys::RD_KAFKA_EVENT_STATS => self.handle_stats_event(ev.ptr()),
rdsys::RD_KAFKA_EVENT_LOG => {
self.handle_log_event(ev.ptr());
return EventPollResult::EventConsumed;
}
rdsys::RD_KAFKA_EVENT_STATS => {
self.handle_stats_event(ev.ptr());
return EventPollResult::EventConsumed;
}
rdsys::RD_KAFKA_EVENT_ERROR => {
// rdkafka reports consumer errors via RD_KAFKA_EVENT_ERROR but producer errors gets
// embedded on the ack returned via RD_KAFKA_EVENT_DR. Hence we need to return this event
// for the consumer case in order to return the error to the user.
self.handle_error_event(ev.ptr());
return Some(ev);
return EventPollResult::Event(ev);
}
rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH => {
if C::ENABLE_REFRESH_OAUTH_TOKEN {
self.handle_oauth_refresh_event(ev.ptr());
}
return EventPollResult::EventConsumed;
}
_ => {
return Some(ev);
return EventPollResult::Event(ev);
}
}
}
None
EventPollResult::None
}

fn handle_log_event(&self, event: *mut RDKafkaEvent) {
Expand Down
9 changes: 4 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::ffi::CString;
use std::iter::FromIterator;
use std::os::raw::c_char;
use std::ptr;
Expand Down Expand Up @@ -150,10 +150,9 @@ impl NativeClientConfig {
}

// Convert the C string to a Rust string.
Ok(CStr::from_bytes_with_nul(&buf)
.unwrap()
.to_string_lossy()
.into())
Ok(String::from_utf8_lossy(&buf)
.trim_matches(char::from(0))
.to_string())
}
}

Expand Down
Loading