Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): Bump to Rust 1.72.0 #18389

Merged
merged 13 commits into from
Sep 1, 2023
5 changes: 5 additions & 0 deletions lib/vector-buffers/src/test/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ macro_rules! await_timeout {
}};
}

/// Run a future with a temporary directory.
///
/// # Panics
///
/// Will panic if function cannot create a temp directory.
pub async fn with_temp_dir<F, Fut, V>(f: F) -> V
where
F: FnOnce(&Path) -> Fut,
Expand Down
1 change: 1 addition & 0 deletions lib/vector-buffers/src/test/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ macro_rules! message_wrapper {
}

impl EventCount for $id {
#[allow(clippy::redundant_closure_call)]
fn event_count(&self) -> usize {
usize::try_from($event_count(self)).unwrap_or(usize::MAX)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/test/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Variant {
let (sender, receiver) = builder
.build(String::from("benches"), Span::none())
.await
.expect("topology build should not fail");
.unwrap_or_else(|_| unreachable!("topology build should not fail"));

(sender, receiver)
}
Expand Down
16 changes: 13 additions & 3 deletions lib/vector-buffers/src/topology/acks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ where
///
/// Acknowledgements should be given by the caller to update the acknowledgement state before
/// trying to get any eligible markers.
///
/// # Panics
///
/// Will panic if adding ack amount overflows.
pub fn add_acknowledgements(&mut self, amount: N) {
self.unclaimed_acks = self
.unclaimed_acks
Expand Down Expand Up @@ -315,6 +319,10 @@ where
///
/// When other pending markers are present, and the given ID is logically behind the next
/// expected marker ID, `Err(MarkerError::MonotonicityViolation)` is returned.
///
/// # Panics
///
/// Will if pending markers is empty when last pending marker is an unknown size.
pub fn add_marker(
&mut self,
id: N,
Expand All @@ -341,7 +349,7 @@ where
let last_marker = self
.pending_markers
.back_mut()
.expect("pending markers should not be empty");
.unwrap_or_else(|| unreachable!("pending markers should not be empty"));

last_marker.len = PendingMarkerLength::Assumed(len);
}
Expand Down Expand Up @@ -425,13 +433,15 @@ where
let PendingMarker { id, data, .. } = self
.pending_markers
.pop_front()
.expect("pending markers cannot be empty");
.unwrap_or_else(|| unreachable!("pending markers cannot be empty"));

if acks_to_claim > N::min_value() {
self.unclaimed_acks = self
.unclaimed_acks
.checked_sub(&acks_to_claim)
.expect("should not be able to claim more acks than are unclaimed");
.unwrap_or_else(|| {
unreachable!("should not be able to claim more acks than are unclaimed")
});
}

self.acked_marker_id = id.wrapping_add(&len.len());
Expand Down
6 changes: 4 additions & 2 deletions lib/vector-buffers/src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ impl<T: Bufferable> TopologyBuilder<T> {
/// This is a convenience method for `vector` as it is used for inter-transform channels, and we
/// can simplifying needing to require callers to do all the boilerplate to create the builder,
/// create the stage, installing buffer usage metrics that aren't required, and so on.
///
#[allow(clippy::print_stderr)]
pub async fn standalone_memory(
max_events: NonZeroUsize,
when_full: WhenFull,
Expand All @@ -193,7 +195,7 @@ impl<T: Bufferable> TopologyBuilder<T> {
let (sender, receiver) = memory_buffer
.into_buffer_parts(usage_handle.clone())
.await
.expect("should not fail to directly create a memory buffer");
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));

let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
Expand Down Expand Up @@ -228,7 +230,7 @@ impl<T: Bufferable> TopologyBuilder<T> {
let (sender, receiver) = memory_buffer
.into_buffer_parts(usage_handle.clone())
.await
.expect("should not fail to directly create a memory buffer");
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));

let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
Expand Down
13 changes: 9 additions & 4 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ impl<T: Bufferable> LimitedSender<T> {
.limiter
.clone()
.acquire_many_owned(permits_required)
.await else {
return Err(SendError(item))
.await
else {
return Err(SendError(item));
};

self.inner
.data
.push((permits, item))
.expect("acquired permits but channel reported being full");
.unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full"));
self.inner.read_waker.notify_one();

trace!("Sent item.");
Expand All @@ -130,6 +131,10 @@ impl<T: Bufferable> LimitedSender<T> {
/// `Err(TrySendError::Disconnected)` be returned with the given `item`. If the channel has
/// insufficient capacity for the item, then `Err(TrySendError::InsufficientCapacity)` will be
/// returned with the given `item`.
///
/// # Panics
///
/// Will panic if adding ack amount overflows.
pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
// Calculate how many permits we need, and try to acquire them all without waiting.
let permits_required = self.get_required_permits_for_item(&item);
Expand All @@ -151,7 +156,7 @@ impl<T: Bufferable> LimitedSender<T> {
self.inner
.data
.push((permits, item))
.expect("acquired permits but channel reported being full");
.unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full"));
self.inner.read_waker.notify_one();

trace!("Attempt to send item succeeded.");
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/topology/channel/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl<T: Bufferable> BufferSender<T> {
sent_to_base = false;
self.overflow
.as_mut()
.expect("overflow must exist")
.unwrap_or_else(|| unreachable!("overflow must exist"))
.send(item)
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/variants/disk_v2/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ where
return Err(BuildError::InvalidParameter {
param_name: "max_record_size",
reason: "must be less than 2^64 bytes".to_string(),
})
});
};

if max_record_size_converted > max_data_file_size {
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/variants/disk_v2/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,6 @@ where
)
.field("writer_done", &self.writer_done.load(Ordering::Acquire))
.field("last_flush", &self.last_flush.load())
.finish()
.finish_non_exhaustive()
}
}
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/variants/disk_v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ where
// If there's an error decoding the item, just fall back to the slow path,
// because this file might actually be where we left off, so we don't want
// to incorrectly skip ahead or anything.
break
break;
};

// We have to remove 1 from the event count here because otherwise the ID would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl fmt::Debug for Record {
.field("event_count", &self.event_count)
.field("encoded_len", &self.encoded_len())
.field("archived_len", &self.archived_len())
.finish()
.finish_non_exhaustive()
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/vector-config/src/schema/visitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod scoped_visit;
mod unevaluated;

#[cfg(test)]
mod test;
mod test;

pub use self::human_name::GenerateHumanFriendlyNameVisitor;
pub use self::inline_single::InlineSingleUseReferencesVisitor;
Expand Down
4 changes: 4 additions & 0 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl LogSchema {
///
/// This should only be used where the result will either be cached,
/// or performance isn't critical, since this requires memory allocation.
///
/// # Panics
///
/// Panics if the path in `self.message_key` is invalid.
pub fn owned_message_path(&self) -> OwnedTargetPath {
self.message_key
.path
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl LogEvent {
for field in fields {
let field_path = event_path!(field.as_ref());
let Some(incoming_val) = incoming.remove(field_path) else {
continue
continue;
};
match self.get_mut(field_path) {
None => {
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/lua/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<'a> FromLua<'a> for Event {
from: value.type_name(),
to: "Event",
message: Some("Event should be a Lua table".to_string()),
})
});
};
match (table.raw_get("log")?, table.raw_get("metric")?) {
(LuaValue::Table(log), LuaValue::Nil) => {
Expand Down
4 changes: 4 additions & 0 deletions lib/vector-core/src/event/lua/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ pub fn table_is_timestamp(t: &LuaTable<'_>) -> LuaResult<bool> {
/// # Errors
///
/// This function will fail if the table is malformed.
///
/// # Panics
///
/// Panics if the resulting timestamp is invalid.
#[allow(clippy::needless_pass_by_value)] // constrained by mlua types
pub fn table_to_timestamp(t: LuaTable<'_>) -> LuaResult<DateTime<Utc>> {
let year = t.raw_get("year")?;
Expand Down
4 changes: 4 additions & 0 deletions lib/vector-core/src/event/metric/arbitrary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ impl Arbitrary for MetricValue {
type Parameters = ();
type Strategy = BoxedStrategy<MetricValue>;

// TODO(jszwedko): clippy allow can be removed once
// https://github.com/proptest-rs/proptest/commit/466d59daeca317f815bb8358e8d981bb9bd9431a is
// released
#[allow(clippy::arc_with_non_send_sync)]
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
let strategy = prop_oneof![
realistic_float().prop_map(|value| MetricValue::Counter { value }),
Expand Down
6 changes: 3 additions & 3 deletions lib/vector-core/src/event/metric/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ impl MetricData {
other.time.interval_ms,
) {
(Some(t1), Some(i1), Some(t2), Some(i2)) => {
let Ok(delta_t) = TryInto::<u32>::try_into(
t1.timestamp_millis().abs_diff(t2.timestamp_millis()),
) else {
let Ok(delta_t) =
TryInto::<u32>::try_into(t1.timestamp_millis().abs_diff(t2.timestamp_millis()))
else {
return false;
};

Expand Down
4 changes: 2 additions & 2 deletions lib/vector-core/src/event/util/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ mod keys;
pub use all_fields::{all_fields, all_fields_non_object_root, all_metadata_fields};
pub use keys::keys;

use super::Value;
use super::Value;

#[cfg(test)]
mod test {
mod test {
use std::collections::BTreeMap;

use serde_json::Value as JsonValue;
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod log;

use super::Value;
use super::Value;
10 changes: 7 additions & 3 deletions lib/vector-core/src/event/vrl_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,12 +1246,14 @@ mod test {
MetricValue::Counter { value: 1.23 },
);

let validpaths_get = [".name",
let validpaths_get = [
".name",
".namespace",
".timestamp",
".kind",
".tags",
".type"];
".type",
];

let validpaths_set = [".name", ".namespace", ".timestamp", ".kind", ".tags"];

Expand Down Expand Up @@ -1338,7 +1340,9 @@ mod test {
)]))
);

let VrlTarget::Metric { metric, .. } = target else {unreachable!()};
let VrlTarget::Metric { metric, .. } = target else {
unreachable!()
};

// get single value (should be the last one)
assert_eq!(metric.tag_value("foo"), Some("b".into()));
Expand Down
3 changes: 2 additions & 1 deletion lib/vector-core/src/metrics/ddsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,8 @@ impl AgentDDSketch {
// generally enforced at the source level by converting from cumulative buckets, or
// enforced by the internal structures that hold bucketed data i.e. Vector's internal
// `Histogram` data structure used for collecting histograms from `metrics`.
let count = u32::try_from(bucket.count).expect("count range has already been checked.");
let count = u32::try_from(bucket.count)
.unwrap_or_else(|_| unreachable!("count range has already been checked."));

self.insert_interpolate_bucket(lower, upper, count);
lower = bucket.upper_limit;
Expand Down
4 changes: 3 additions & 1 deletion lib/vector-core/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ impl<S: Sink<Event> + Send + Unpin> EventSink<S> {
fn flush_queue(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
while self.queue.is_some() {
poll_ready_ok!(self.sink.poll_ready_unpin(cx));
let Some(event) = self.next_event() else {break};
let Some(event) = self.next_event() else {
break;
};
if let Err(err) = self.sink.start_send_unpin(event) {
return Poll::Ready(Err(err));
}
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-core/src/stream/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ where
// We've got an input batch to process and the service is ready to accept a request.
maybe_ready = poll_fn(|cx| service.poll_ready(cx)), if next_batch.is_some() => {
let mut batch = next_batch.take()
.expect("batch should be populated");
.unwrap_or_else(|| unreachable!("batch should be populated"));

let mut maybe_ready = Some(maybe_ready);
while !batch.is_empty() {
Expand All @@ -155,7 +155,7 @@ where
},
};

let mut req = batch.pop_front().expect("batch should not be empty");
let mut req = batch.pop_front().unwrap_or_else(|| unreachable!("batch should not be empty"));
seq_num += 1;
let request_id = seq_num;

Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod partitioned_batcher;

pub use concurrent_map::ConcurrentMap;
pub use driver::{Driver, DriverResponse};
use futures_unordered_count::FuturesUnorderedCount;
use futures_unordered_count::FuturesUnorderedCount;
pub use partitioned_batcher::{BatcherSettings, ExpirationQueue, PartitionedBatcher};
17 changes: 16 additions & 1 deletion lib/vector-core/src/tls/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ impl TlsSettings {
})
}

/// Returns the identity as PKCS12
///
/// # Panics
///
/// Panics if the identity is invalid.
fn identity(&self) -> Option<ParsedPkcs12_2> {
// This data was test-built previously, so we can just use it
// here and expect the results will not fail. This can all be
Expand All @@ -219,6 +224,11 @@ impl TlsSettings {
})
}

/// Returns the identity as PEM data
///
/// # Panics
///
/// Panics if the identity is missing, invalid, or the authorities to chain are invalid.
pub fn identity_pem(&self) -> Option<(Vec<u8>, Vec<u8>)> {
self.identity().map(|identity| {
let mut cert = identity
Expand All @@ -244,6 +254,11 @@ impl TlsSettings {
})
}

/// Returns the authorities as PEM data
///
/// # Panics
///
/// Panics if the authority is invalid.
pub fn authorities_pem(&self) -> impl Iterator<Item = Vec<u8>> + '_ {
self.authorities.iter().map(|authority| {
authority
Expand Down Expand Up @@ -498,7 +513,7 @@ impl fmt::Debug for TlsSettings {
f.debug_struct("TlsSettings")
.field("verify_certificate", &self.verify_certificate)
.field("verify_hostname", &self.verify_hostname)
.finish()
.finish_non_exhaustive()
}
}

Expand Down
Loading