Skip to content

Commit 545713a

Browse files
committed
refactor: using tracing to carry context and log
Closes #38.
1 parent 82d25e5 commit 545713a

File tree

5 files changed

+52
-21
lines changed

5 files changed

+52
-21
lines changed

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ num_enum = "0.5.6"
2323
ignore-result = "0.2.0"
2424
compact_str = "0.4"
2525
const_format = "0.2.22"
26-
log = "0.4.14"
2726
static_assertions = "1.1.0"
2827
hashbrown = "0.12.0"
2928
hashlink = "0.8.0"
@@ -35,9 +34,10 @@ webpki-roots = "0.26.1"
3534
derive-where = "1.2.7"
3635
tokio-rustls = "0.26.0"
3736
fastrand = "2.0.2"
37+
tracing = "0.1.40"
3838

3939
[dev-dependencies]
40-
test-log = "0.2.12"
40+
test-log = { version = "0.2.15", features = ["log", "trace"] }
4141
env_logger = "0.10.0"
4242
rand = "0.8.4"
4343
pretty_assertions = "1.1.0"

src/client/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use either::{Either, Left, Right};
1111
use ignore_result::Ignore;
1212
use thiserror::Error;
1313
use tokio::sync::{mpsc, watch};
14+
use tracing::field::display;
15+
use tracing::{instrument, Span};
1416

1517
pub use self::watcher::{OneshotWatcher, PersistentWatcher, StateWatcher};
1618
use super::session::{Depot, MarshalledRequest, Session, SessionOperation, WatchReceiver};
@@ -1632,6 +1634,7 @@ impl Connector {
16321634
self
16331635
}
16341636

1637+
#[instrument(name = "connect", skip_all, fields(session))]
16351638
async fn connect_internally(&mut self, secure: bool, cluster: &str) -> Result<Client> {
16361639
let (endpoints, chroot) = endpoint::parse_connect_string(cluster, secure)?;
16371640
if let Some(session) = self.session.as_ref() {
@@ -1641,6 +1644,7 @@ impl Connector {
16411644
None,
16421645
));
16431646
}
1647+
Span::current().record("session", display(session.id()));
16441648
}
16451649
if self.session_timeout < Duration::ZERO {
16461650
return Err(Error::BadArguments(&"session timeout must not be negative"));

src/session/connection.rs

+13-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use tokio::net::TcpStream;
1414
use tokio::{select, time};
1515
use tokio_rustls::client::TlsStream;
1616
use tokio_rustls::TlsConnector;
17+
use tracing::{debug, trace};
1718

1819
use crate::deadline::Deadline;
1920
use crate::endpoint::{EndpointRef, IterableEndpoints};
@@ -206,14 +207,21 @@ impl Connector {
206207
let mut deadline = Deadline::never();
207208
while let Some(endpoint) = endpoints.peek() {
208209
i += 1;
209-
if let Ok(conn) = self.connect(endpoint, &mut deadline).await {
210-
if let Ok(true) = conn.command_isro().await {
211-
return Some(unsafe { std::mem::transmute(endpoint) });
212-
}
210+
match self.connect(endpoint, &mut deadline).await {
211+
Ok(conn) => match conn.command_isro().await {
212+
Ok(true) => return Some(unsafe { std::mem::transmute(endpoint) }),
213+
Ok(false) => trace!("succeeds to contact readonly {}", endpoint),
214+
Err(err) => trace!(%err, r#"fails to complete "isro" to {}"#, endpoint),
215+
},
216+
Err(err) => trace!(%err, "fails to contact {}", endpoint),
213217
}
214218
endpoints.step();
215219
if i % n == 0 {
216-
log::debug!("ZooKeeper fails to contact writable server from endpoints {:?}", endpoints.endpoints());
220+
debug!(
221+
sleep = timeout.as_millis(),
222+
"fails to contact writable server from endpoints {:?}",
223+
endpoints.endpoints()
224+
);
217225
time::sleep(timeout).await;
218226
timeout = max_timeout.min(timeout * 2);
219227
} else {

src/session/depot.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::io::{self, IoSlice};
33

44
use hashbrown::HashMap;
55
use strum::IntoEnumIterator;
6+
use tracing::debug;
67

78
use super::connection::Connection;
89
use super::request::{MarshalledRequest, OpStat, Operation, SessionOperation, StateResponser};
@@ -172,7 +173,7 @@ impl Depot {
172173

173174
pub fn push_session(&mut self, operation: SessionOperation) {
174175
let info = operation.request.get_operation_info();
175-
log::debug!("ZooKeeper operation request: {:?}", info);
176+
debug!("sending request: {:?}", info);
176177
if let (op_code, OpStat::Watch { path, mode }) = info {
177178
let path = unsafe { std::mem::transmute::<&str, &'_ str>(path) };
178179
if op_code == OpCode::RemoveWatches {

src/session/mod.rs

+31-13
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use rustls::ClientConfig;
1414
use tokio::select;
1515
use tokio::sync::mpsc;
1616
use tokio::time::{self, Instant};
17+
use tracing::field::display;
18+
use tracing::{debug, info, instrument, warn, Span};
1719

1820
use self::connection::{Connection, Connector};
1921
pub use self::depot::Depot;
@@ -138,6 +140,7 @@ impl Session {
138140
}
139141
}
140142

143+
#[instrument(name = "serve", skip_all, fields(session = display(self.session.id)))]
141144
pub async fn serve(
142145
&mut self,
143146
mut endpoints: IterableEndpoints,
@@ -154,7 +157,7 @@ impl Session {
154157
while !self.session_state.is_terminated() {
155158
let conn = match self.start(&mut endpoints, &mut buf, &mut connecting_trans).await {
156159
Err(err) => {
157-
log::warn!("fail to connect to cluster {:?} due to {}", endpoints.endpoints(), err);
160+
warn!("fail to connect to cluster {:?} due to {}", endpoints.endpoints(), err);
158161
self.resolve_start_error(&err);
159162
break;
160163
},
@@ -212,7 +215,7 @@ impl Session {
212215
) {
213216
if let Err(err) = self.serve_session(endpoints, &mut conn, buf, depot, requester, unwatch_requester).await {
214217
self.resolve_serve_error(&err);
215-
log::info!("ZooKeeper session {} state {} error {}", self.session.id, self.session_state, err);
218+
info!("enter state {} due to error {}", self.session_state, err);
216219
depot.error(&err);
217220
} else {
218221
self.change_state(SessionState::Disconnected);
@@ -229,7 +232,7 @@ impl Session {
229232
fn handle_session_failure(&mut self, operation: SessionOperation, err: Error, depot: &mut Depot) {
230233
let SessionOperation { responser, request, .. } = operation;
231234
let info = request.get_operation_info();
232-
log::debug!("ZooKeeper operation unknown failure: {:?}, {:?}", info, err);
235+
debug!("unknown operation failure: {:?}, {:?}", info, err);
233236
match info {
234237
(op_code, OpStat::Watch { path, mode }) if op_code != OpCode::RemoveWatches => depot.fail_watch(path, mode),
235238
_ => {},
@@ -244,7 +247,7 @@ impl Session {
244247
depot: &mut Depot,
245248
) -> (OpCode, WatchReceiver) {
246249
let info = request.get_operation_info();
247-
log::debug!("ZooKeeper operation get reply: {:?}, {:?}", info, error_code);
250+
debug!("operation get reply: {:?}, {:?}", info, error_code);
248251
let (op_code, path, mode) = match info {
249252
(op_code, OpStat::Watch { path, mode }) if op_code != OpCode::RemoveWatches => (op_code, path, mode),
250253
(op_code, _) => return (op_code, WatchReceiver::None),
@@ -298,7 +301,7 @@ impl Session {
298301
depot.pop_ping()?;
299302
if let Some(last_ping) = self.last_ping.take() {
300303
let elapsed = Instant::now() - last_ping;
301-
log::debug!("ZooKeeper session {} got ping response after {}ms", self.session.id, elapsed.as_millis());
304+
debug!("got ping response after {}ms", elapsed.as_millis());
302305
}
303306
return Ok(());
304307
}
@@ -352,13 +355,22 @@ impl Session {
352355

353356
fn handle_connect_response(&mut self, mut body: &[u8]) -> Result<(), Error> {
354357
let response = record::unmarshal::<ConnectResponse>(&mut body)?;
355-
log::trace!("Received connect response: {response:?}");
358+
debug!("received connect response: {response:?}");
356359
if response.session_id == 0 {
357360
return Err(Error::SessionExpired);
358361
} else if !self.is_readonly_allowed() && response.readonly {
359362
return Err(Error::ConnectionLoss);
360363
}
361-
self.session.id = SessionId(response.session_id);
364+
let session_id = SessionId(response.session_id);
365+
if self.session.id != session_id {
366+
self.session.id = session_id;
367+
info!(
368+
"new session established: {} readonly={}, timeout={}ms",
369+
session_id, response.readonly as bool, response.session_timeout
370+
);
371+
let span = Span::current();
372+
span.record("session", display(session_id));
373+
}
362374
self.session.password.clear();
363375
self.session.password.extend_from_slice(response.password);
364376
self.session.readonly = response.readonly;
@@ -457,7 +469,7 @@ impl Session {
457469
select! {
458470
Some(endpoint) = Self::poll(&mut seek_for_writable), if seek_for_writable.is_some() => {
459471
seek_for_writable = None;
460-
log::debug!("ZooKeeper succeeds to contact writable server {}", endpoint);
472+
debug!("succeeds to contact writable server {}", endpoint);
461473
channel_halted = true;
462474
},
463475
_ = conn.readable() => {
@@ -520,7 +532,7 @@ impl Session {
520532
password: self.session.password.as_slice(),
521533
readonly: self.is_readonly_allowed(),
522534
};
523-
log::trace!("Sending connect request: {request:?}");
535+
debug!("sending connect request: {request:?}");
524536
let operation = ConnectOperation::new(&request);
525537
depot.push_operation(Operation::Connect(operation));
526538
}
@@ -544,11 +556,11 @@ impl Session {
544556
};
545557
let mut conn = match self.connector.connect(endpoint, deadline).await {
546558
Err(err) => {
547-
log::debug!("ZooKeeper fails in connecting to {} due to {:?}", endpoint, err);
559+
debug!("fails in connecting to {} due to {:?}", endpoint, err);
548560
return Err(Error::ConnectionLoss);
549561
},
550562
Ok(conn) => {
551-
log::debug!("ZooKeeper succeeds in connecting to {}", endpoint);
563+
debug!("succeeds in connecting to {}", endpoint);
552564
conn
553565
},
554566
};
@@ -563,11 +575,11 @@ impl Session {
563575
self.last_ping = None;
564576
match self.serve_connecting(&mut conn, buf, depot).await {
565577
Err(err) => {
566-
log::warn!("ZooKeeper fails to establish session to {} due to {}", endpoint, err);
578+
warn!("fails to establish session to {} due to {}", endpoint, err);
567579
Err(err)
568580
},
569581
_ => {
570-
log::info!("ZooKeeper succeeds to establish session({}) to {}", self.session.id, endpoint);
582+
info!("succeeds to establish session to {}", endpoint);
571583
Ok(conn)
572584
},
573585
}
@@ -581,6 +593,12 @@ impl Session {
581593
) -> Result<Connection, Error> {
582594
endpoints.start();
583595
let session_timeout = if self.session.id.0 == 0 { self.session_timeout } else { self.session_expired_timeout };
596+
debug!(
597+
session_timeout = session_timeout.as_millis(),
598+
connection_timeout = self.connector.timeout().as_millis(),
599+
"attempts new connections to {:?}",
600+
endpoints.endpoints()
601+
);
584602
let mut deadline = Deadline::until(self.last_recv + session_timeout);
585603
let mut last_error = match self.start_once(endpoints, &mut deadline, buf, depot).await {
586604
Err(err) => err,

0 commit comments

Comments
 (0)