@@ -14,6 +14,8 @@ use rustls::ClientConfig;
14
14
use tokio:: select;
15
15
use tokio:: sync:: mpsc;
16
16
use tokio:: time:: { self , Instant } ;
17
+ use tracing:: field:: display;
18
+ use tracing:: { debug, info, instrument, warn, Span } ;
17
19
18
20
use self :: connection:: { Connection , Connector } ;
19
21
pub use self :: depot:: Depot ;
@@ -138,6 +140,7 @@ impl Session {
138
140
}
139
141
}
140
142
143
+ #[ instrument( name = "serve" , skip_all, fields( session = display( self . session. id) ) ) ]
141
144
pub async fn serve (
142
145
& mut self ,
143
146
mut endpoints : IterableEndpoints ,
@@ -154,7 +157,7 @@ impl Session {
154
157
while !self . session_state . is_terminated ( ) {
155
158
let conn = match self . start ( & mut endpoints, & mut buf, & mut connecting_trans) . await {
156
159
Err ( err) => {
157
- log :: warn!( "fail to connect to cluster {:?} due to {}" , endpoints. endpoints( ) , err) ;
160
+ warn ! ( "fail to connect to cluster {:?} due to {}" , endpoints. endpoints( ) , err) ;
158
161
self . resolve_start_error ( & err) ;
159
162
break ;
160
163
} ,
@@ -212,7 +215,7 @@ impl Session {
212
215
) {
213
216
if let Err ( err) = self . serve_session ( endpoints, & mut conn, buf, depot, requester, unwatch_requester) . await {
214
217
self . resolve_serve_error ( & err) ;
215
- log :: info!( "ZooKeeper session {} state {} error {}" , self . session . id , self . session_state, err) ;
218
+ info ! ( "enter state {} due to error {}" , self . session_state, err) ;
216
219
depot. error ( & err) ;
217
220
} else {
218
221
self . change_state ( SessionState :: Disconnected ) ;
@@ -229,7 +232,7 @@ impl Session {
229
232
fn handle_session_failure ( & mut self , operation : SessionOperation , err : Error , depot : & mut Depot ) {
230
233
let SessionOperation { responser, request, .. } = operation;
231
234
let info = request. get_operation_info ( ) ;
232
- log :: debug!( "ZooKeeper operation unknown failure: {:?}, {:?}" , info, err) ;
235
+ debug ! ( "unknown operation failure: {:?}, {:?}" , info, err) ;
233
236
match info {
234
237
( op_code, OpStat :: Watch { path, mode } ) if op_code != OpCode :: RemoveWatches => depot. fail_watch ( path, mode) ,
235
238
_ => { } ,
@@ -244,7 +247,7 @@ impl Session {
244
247
depot : & mut Depot ,
245
248
) -> ( OpCode , WatchReceiver ) {
246
249
let info = request. get_operation_info ( ) ;
247
- log :: debug!( "ZooKeeper operation get reply: {:?}, {:?}" , info, error_code) ;
250
+ debug ! ( "operation get reply: {:?}, {:?}" , info, error_code) ;
248
251
let ( op_code, path, mode) = match info {
249
252
( op_code, OpStat :: Watch { path, mode } ) if op_code != OpCode :: RemoveWatches => ( op_code, path, mode) ,
250
253
( op_code, _) => return ( op_code, WatchReceiver :: None ) ,
@@ -298,7 +301,7 @@ impl Session {
298
301
depot. pop_ping ( ) ?;
299
302
if let Some ( last_ping) = self . last_ping . take ( ) {
300
303
let elapsed = Instant :: now ( ) - last_ping;
301
- log :: debug!( "ZooKeeper session {} got ping response after {}ms" , self . session . id , elapsed. as_millis( ) ) ;
304
+ debug ! ( "got ping response after {}ms" , elapsed. as_millis( ) ) ;
302
305
}
303
306
return Ok ( ( ) ) ;
304
307
}
@@ -352,13 +355,22 @@ impl Session {
352
355
353
356
fn handle_connect_response ( & mut self , mut body : & [ u8 ] ) -> Result < ( ) , Error > {
354
357
let response = record:: unmarshal :: < ConnectResponse > ( & mut body) ?;
355
- log :: trace !( "Received connect response: {response:?}" ) ;
358
+ debug ! ( "received connect response: {response:?}" ) ;
356
359
if response. session_id == 0 {
357
360
return Err ( Error :: SessionExpired ) ;
358
361
} else if !self . is_readonly_allowed ( ) && response. readonly {
359
362
return Err ( Error :: ConnectionLoss ) ;
360
363
}
361
- self . session . id = SessionId ( response. session_id ) ;
364
+ let session_id = SessionId ( response. session_id ) ;
365
+ if self . session . id != session_id {
366
+ self . session . id = session_id;
367
+ info ! (
368
+ "new session established: {} readonly={}, timeout={}ms" ,
369
+ session_id, response. readonly as bool , response. session_timeout
370
+ ) ;
371
+ let span = Span :: current ( ) ;
372
+ span. record ( "session" , display ( session_id) ) ;
373
+ }
362
374
self . session . password . clear ( ) ;
363
375
self . session . password . extend_from_slice ( response. password ) ;
364
376
self . session . readonly = response. readonly ;
@@ -457,7 +469,7 @@ impl Session {
457
469
select ! {
458
470
Some ( endpoint) = Self :: poll( & mut seek_for_writable) , if seek_for_writable. is_some( ) => {
459
471
seek_for_writable = None ;
460
- log :: debug!( "ZooKeeper succeeds to contact writable server {}" , endpoint) ;
472
+ debug!( "succeeds to contact writable server {}" , endpoint) ;
461
473
channel_halted = true ;
462
474
} ,
463
475
_ = conn. readable( ) => {
@@ -520,7 +532,7 @@ impl Session {
520
532
password : self . session . password . as_slice ( ) ,
521
533
readonly : self . is_readonly_allowed ( ) ,
522
534
} ;
523
- log :: trace !( "Sending connect request: {request:?}" ) ;
535
+ debug ! ( "sending connect request: {request:?}" ) ;
524
536
let operation = ConnectOperation :: new ( & request) ;
525
537
depot. push_operation ( Operation :: Connect ( operation) ) ;
526
538
}
@@ -544,11 +556,11 @@ impl Session {
544
556
} ;
545
557
let mut conn = match self . connector . connect ( endpoint, deadline) . await {
546
558
Err ( err) => {
547
- log :: debug!( "ZooKeeper fails in connecting to {} due to {:?}" , endpoint, err) ;
559
+ debug ! ( "fails in connecting to {} due to {:?}" , endpoint, err) ;
548
560
return Err ( Error :: ConnectionLoss ) ;
549
561
} ,
550
562
Ok ( conn) => {
551
- log :: debug!( "ZooKeeper succeeds in connecting to {}" , endpoint) ;
563
+ debug ! ( "succeeds in connecting to {}" , endpoint) ;
552
564
conn
553
565
} ,
554
566
} ;
@@ -563,11 +575,11 @@ impl Session {
563
575
self . last_ping = None ;
564
576
match self . serve_connecting ( & mut conn, buf, depot) . await {
565
577
Err ( err) => {
566
- log :: warn!( "ZooKeeper fails to establish session to {} due to {}" , endpoint, err) ;
578
+ warn ! ( "fails to establish session to {} due to {}" , endpoint, err) ;
567
579
Err ( err)
568
580
} ,
569
581
_ => {
570
- log :: info!( "ZooKeeper succeeds to establish session({}) to {}" , self . session . id , endpoint) ;
582
+ info ! ( "succeeds to establish session to {}" , endpoint) ;
571
583
Ok ( conn)
572
584
} ,
573
585
}
@@ -581,6 +593,12 @@ impl Session {
581
593
) -> Result < Connection , Error > {
582
594
endpoints. start ( ) ;
583
595
let session_timeout = if self . session . id . 0 == 0 { self . session_timeout } else { self . session_expired_timeout } ;
596
+ debug ! (
597
+ session_timeout = session_timeout. as_millis( ) ,
598
+ connection_timeout = self . connector. timeout( ) . as_millis( ) ,
599
+ "attempts new connections to {:?}" ,
600
+ endpoints. endpoints( )
601
+ ) ;
584
602
let mut deadline = Deadline :: until ( self . last_recv + session_timeout) ;
585
603
let mut last_error = match self . start_once ( endpoints, & mut deadline, buf, depot) . await {
586
604
Err ( err) => err,
0 commit comments