Skip to content

Commit c15c0ed

Browse files
authored
feat(util): new helper type for recovering recorder after installing it (#362)
1 parent f7434bd commit c15c0ed

File tree

9 files changed

+334
-68
lines changed

9 files changed

+334
-68
lines changed

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ jobs:
4040
- name: Install Rust ${{ matrix.rust_version }}
4141
run: rustup default ${{ matrix.rust_version }}
4242
- name: Run Tests
43-
run: cargo test --all-features --workspace --exclude=metrics-observer
43+
run: cargo test --all-features --workspace --exclude=metrics-observer -- --test-threads=1
4444
docs:
4545
runs-on: ubuntu-latest
4646
env:

metrics-util/CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
## [Unreleased] - ReleaseDate
1111

12+
### Added
13+
14+
- Added a new helper type, `RecoverableRecorder`, that allows installing a recorder and then
15+
recovering it later.
16+
1217
## [0.15.0] - 2023-04-16
1318

1419
### Changed

metrics-util/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ pub use kind::{MetricKind, MetricKindMask};
3131
mod histogram;
3232
pub use histogram::Histogram;
3333

34+
mod recoverable;
35+
pub use recoverable::RecoverableRecorder;
36+
3437
#[cfg(feature = "summary")]
3538
mod summary;
3639
#[cfg(feature = "summary")]

metrics-util/src/recoverable.rs

+320
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
use std::sync::{Arc, Weak};
2+
3+
use metrics::{
4+
Counter, Gauge, Histogram, Key, KeyName, Recorder, SetRecorderError, SharedString, Unit,
5+
};
6+
7+
/// Wraps a recorder to allow for recovering it after being installed.
8+
///
9+
/// Installing a recorder generally involves providing an owned value, which means that it is not
10+
/// possible to recover the recorder after it has been installed. For some recorder implementations,
11+
/// it can be important to perform finalization before the application exits, which is not possible
12+
/// if the application cannot consume the recorder.
13+
///
14+
/// `RecoverableRecorder` allows wrapping a recorder such that a weak reference to it is installed
15+
/// globally, while the recorder itself is held by `RecoverableRecorder`. This allows for recovering
16+
/// the recorder whenever the application chooses.
17+
///
18+
/// ## As a drop guard
19+
///
20+
/// While `RecoverableRecorder` provides a method to manually recover the recorder directly, one
21+
/// particular benefit is that due to how the recorder is wrapped, when `RecoverableRecorder` is
22+
/// dropped, and the last active reference to it is dropped, the recorder itself will be dropped.
23+
///
24+
/// This allows using `RecoverableRecorder` as a drop guard, ensuring that by dropping it, the
25+
/// recorder itself will be dropped, and any finalization logic implemented for the recorder will be
26+
/// run.
27+
pub struct RecoverableRecorder<R> {
28+
recorder: Arc<R>,
29+
}
30+
31+
impl<R: Recorder + 'static> RecoverableRecorder<R> {
32+
/// Creates a new `RecoverableRecorder`, wrapping the given recorder.
33+
///
34+
/// A weakly-referenced version of the recorder is installed globally, while the original
35+
/// recorder is held within `RecoverableRecorder`, and can be recovered by calling `into_inner`.
36+
///
37+
/// # Errors
38+
///
39+
/// If a recorder is already installed, an error is returned.
40+
pub fn from_recorder(recorder: R) -> Result<Self, SetRecorderError> {
41+
let recorder = Arc::new(recorder);
42+
43+
let wrapped = WeakRecorder::from_arc(&recorder);
44+
metrics::set_boxed_recorder(Box::new(wrapped))?;
45+
46+
Ok(Self { recorder })
47+
}
48+
49+
/// Consumes this wrapper, returning the original recorder.
50+
///
51+
/// This method will loop until there are no active weak references to the recorder. It is not
52+
/// advised to call this method under heavy load, as doing so is not deterministic or ordered
53+
/// and may block for an indefinite amount of time.
54+
pub fn into_inner(mut self) -> R {
55+
loop {
56+
match Arc::try_unwrap(self.recorder) {
57+
Ok(recorder) => break recorder,
58+
Err(recorder) => {
59+
self.recorder = recorder;
60+
}
61+
}
62+
}
63+
}
64+
}
65+
66+
struct WeakRecorder<R> {
67+
recorder: Weak<R>,
68+
}
69+
70+
impl<R> WeakRecorder<R> {
71+
fn from_arc(recorder: &Arc<R>) -> Self {
72+
Self { recorder: Arc::downgrade(recorder) }
73+
}
74+
}
75+
76+
impl<R: Recorder> Recorder for WeakRecorder<R> {
77+
fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
78+
if let Some(recorder) = self.recorder.upgrade() {
79+
recorder.describe_counter(key, unit, description);
80+
}
81+
}
82+
83+
fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
84+
if let Some(recorder) = self.recorder.upgrade() {
85+
recorder.describe_gauge(key, unit, description);
86+
}
87+
}
88+
89+
fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
90+
if let Some(recorder) = self.recorder.upgrade() {
91+
recorder.describe_histogram(key, unit, description);
92+
}
93+
}
94+
95+
fn register_counter(&self, key: &Key) -> Counter {
96+
if let Some(recorder) = self.recorder.upgrade() {
97+
recorder.register_counter(key)
98+
} else {
99+
Counter::noop()
100+
}
101+
}
102+
103+
fn register_gauge(&self, key: &Key) -> Gauge {
104+
if let Some(recorder) = self.recorder.upgrade() {
105+
recorder.register_gauge(key)
106+
} else {
107+
Gauge::noop()
108+
}
109+
}
110+
111+
fn register_histogram(&self, key: &Key) -> Histogram {
112+
if let Some(recorder) = self.recorder.upgrade() {
113+
recorder.register_histogram(key)
114+
} else {
115+
Histogram::noop()
116+
}
117+
}
118+
}
119+
120+
#[cfg(test)]
121+
mod tests {
122+
use std::sync::atomic::{AtomicBool, Ordering};
123+
124+
use super::*;
125+
use metrics::{atomics::AtomicU64, CounterFn, GaugeFn, HistogramFn, Key, Recorder};
126+
127+
struct CounterWrapper(AtomicU64);
128+
struct GaugeWrapper(AtomicU64);
129+
struct HistogramWrapper(AtomicU64);
130+
131+
impl CounterWrapper {
132+
fn get(&self) -> u64 {
133+
self.0.load(Ordering::Acquire)
134+
}
135+
}
136+
137+
impl GaugeWrapper {
138+
fn get(&self) -> u64 {
139+
self.0.load(Ordering::Acquire)
140+
}
141+
}
142+
143+
impl HistogramWrapper {
144+
fn get(&self) -> u64 {
145+
self.0.load(Ordering::Acquire)
146+
}
147+
}
148+
149+
impl CounterFn for CounterWrapper {
150+
fn increment(&self, value: u64) {
151+
self.0.fetch_add(value, Ordering::Release);
152+
}
153+
154+
fn absolute(&self, value: u64) {
155+
self.0.store(value, Ordering::Release);
156+
}
157+
}
158+
159+
impl GaugeFn for GaugeWrapper {
160+
fn increment(&self, value: f64) {
161+
self.0.fetch_add(value as u64, Ordering::Release);
162+
}
163+
164+
fn decrement(&self, value: f64) {
165+
self.0.fetch_sub(value as u64, Ordering::Release);
166+
}
167+
168+
fn set(&self, value: f64) {
169+
self.0.store(value as u64, Ordering::Release);
170+
}
171+
}
172+
173+
impl HistogramFn for HistogramWrapper {
174+
fn record(&self, value: f64) {
175+
self.0.fetch_add(value as u64, Ordering::Release);
176+
}
177+
}
178+
179+
struct TestRecorder {
180+
dropped: Arc<AtomicBool>,
181+
counter: Arc<CounterWrapper>,
182+
gauge: Arc<GaugeWrapper>,
183+
histogram: Arc<HistogramWrapper>,
184+
}
185+
186+
impl TestRecorder {
187+
fn new() -> (Self, Arc<CounterWrapper>, Arc<GaugeWrapper>, Arc<HistogramWrapper>) {
188+
let (recorder, _, counter, gauge, histogram) = Self::new_with_drop();
189+
(recorder, counter, gauge, histogram)
190+
}
191+
192+
fn new_with_drop(
193+
) -> (Self, Arc<AtomicBool>, Arc<CounterWrapper>, Arc<GaugeWrapper>, Arc<HistogramWrapper>)
194+
{
195+
let dropped = Arc::new(AtomicBool::new(false));
196+
let counter = Arc::new(CounterWrapper(AtomicU64::new(0)));
197+
let gauge = Arc::new(GaugeWrapper(AtomicU64::new(0)));
198+
let histogram = Arc::new(HistogramWrapper(AtomicU64::new(0)));
199+
200+
let recorder = Self {
201+
dropped: Arc::clone(&dropped),
202+
counter: Arc::clone(&counter),
203+
gauge: Arc::clone(&gauge),
204+
histogram: Arc::clone(&histogram),
205+
};
206+
207+
(recorder, dropped, counter, gauge, histogram)
208+
}
209+
}
210+
211+
impl Recorder for TestRecorder {
212+
fn describe_counter(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
213+
todo!()
214+
}
215+
216+
fn describe_gauge(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
217+
todo!()
218+
}
219+
220+
fn describe_histogram(
221+
&self,
222+
_key: KeyName,
223+
_unit: Option<Unit>,
224+
_description: SharedString,
225+
) {
226+
todo!()
227+
}
228+
229+
fn register_counter(&self, _: &Key) -> Counter {
230+
Counter::from_arc(Arc::clone(&self.counter))
231+
}
232+
233+
fn register_gauge(&self, _: &Key) -> Gauge {
234+
Gauge::from_arc(Arc::clone(&self.gauge))
235+
}
236+
237+
fn register_histogram(&self, _: &Key) -> Histogram {
238+
Histogram::from_arc(Arc::clone(&self.histogram))
239+
}
240+
}
241+
242+
impl Drop for TestRecorder {
243+
fn drop(&mut self) {
244+
self.dropped.store(true, Ordering::Release);
245+
}
246+
}
247+
248+
#[test]
249+
fn basic() {
250+
// Create and install the recorder.
251+
let (recorder, counter, gauge, histogram) = TestRecorder::new();
252+
unsafe {
253+
metrics::clear_recorder();
254+
}
255+
let recoverable =
256+
RecoverableRecorder::from_recorder(recorder).expect("failed to install recorder");
257+
258+
// Record some metrics, and make sure the atomics for each metric type are
259+
// incremented as we would expect them to be.
260+
metrics::counter!("counter", 5);
261+
metrics::increment_gauge!("gauge", 5.0);
262+
metrics::increment_gauge!("gauge", 5.0);
263+
metrics::histogram!("histogram", 5.0);
264+
metrics::histogram!("histogram", 5.0);
265+
metrics::histogram!("histogram", 5.0);
266+
267+
let _recorder = recoverable.into_inner();
268+
assert_eq!(counter.get(), 5);
269+
assert_eq!(gauge.get(), 10);
270+
assert_eq!(histogram.get(), 15);
271+
272+
// Now that we've recovered the recorder, incrementing the same metrics should
273+
// not actually increment the value of the atomics for each metric type.
274+
metrics::counter!("counter", 7);
275+
metrics::increment_gauge!("gauge", 7.0);
276+
metrics::histogram!("histogram", 7.0);
277+
278+
assert_eq!(counter.get(), 5);
279+
assert_eq!(gauge.get(), 10);
280+
assert_eq!(histogram.get(), 15);
281+
}
282+
283+
#[test]
284+
fn on_drop() {
285+
// Create and install the recorder.
286+
let (recorder, dropped, counter, gauge, histogram) = TestRecorder::new_with_drop();
287+
unsafe {
288+
metrics::clear_recorder();
289+
}
290+
let recoverable =
291+
RecoverableRecorder::from_recorder(recorder).expect("failed to install recorder");
292+
293+
// Record some metrics, and make sure the atomics for each metric type are
294+
// incremented as we would expect them to be.
295+
metrics::counter!("counter", 5);
296+
metrics::increment_gauge!("gauge", 5.0);
297+
metrics::increment_gauge!("gauge", 5.0);
298+
metrics::histogram!("histogram", 5.0);
299+
metrics::histogram!("histogram", 5.0);
300+
metrics::histogram!("histogram", 5.0);
301+
302+
drop(recoverable.into_inner());
303+
assert_eq!(counter.get(), 5);
304+
assert_eq!(gauge.get(), 10);
305+
assert_eq!(histogram.get(), 15);
306+
307+
// Now that we've recovered the recorder, incrementing the same metrics should
308+
// not actually increment the value of the atomics for each metric type.
309+
metrics::counter!("counter", 7);
310+
metrics::increment_gauge!("gauge", 7.0);
311+
metrics::histogram!("histogram", 7.0);
312+
313+
assert_eq!(counter.get(), 5);
314+
assert_eq!(gauge.get(), 10);
315+
assert_eq!(histogram.get(), 15);
316+
317+
// And we should be able to check that the recorder was indeed dropped.
318+
assert!(dropped.load(Ordering::Acquire));
319+
}
320+
}

metrics/Cargo.toml

-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ readme = "README.md"
1616
categories = ["development-tools::debugging"]
1717
keywords = ["metrics", "facade"]
1818

19-
build = "build.rs"
20-
2119
[lib]
2220
bench = false
2321

metrics/benches/macros.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ impl Recorder for TestRecorder {
2424
}
2525

2626
fn reset_recorder() {
27-
let recorder = Box::leak(Box::new(TestRecorder::default()));
28-
unsafe { metrics::set_recorder_racy(recorder).unwrap() }
27+
unsafe {
28+
metrics::clear_recorder();
29+
}
30+
metrics::set_boxed_recorder(Box::new(TestRecorder::default())).unwrap()
2931
}
3032

3133
fn macro_benchmark(c: &mut Criterion) {

metrics/build.rs

-13
This file was deleted.

0 commit comments

Comments
 (0)