@@ -215,7 +215,7 @@ impl Session {
215
215
) {
216
216
if let Err ( err) = self . serve_session ( endpoints, & mut conn, buf, depot, requester, unwatch_requester) . await {
217
217
self . resolve_serve_error ( & err) ;
218
- info ! ( "enter state {} due to error {}" , self . session_state, err) ;
218
+ info ! ( "enter state {} due to {}" , self . session_state, err) ;
219
219
depot. error ( & err) ;
220
220
} else {
221
221
self . change_state ( SessionState :: Disconnected ) ;
@@ -356,12 +356,15 @@ impl Session {
356
356
fn handle_connect_response ( & mut self , mut body : & [ u8 ] ) -> Result < ( ) , Error > {
357
357
let response = record:: unmarshal :: < ConnectResponse > ( & mut body) ?;
358
358
debug ! ( "received connect response: {response:?}" ) ;
359
- if response. session_id == 0 {
359
+ let session_id = SessionId ( response. session_id ) ;
360
+ if session_id. 0 == 0 {
360
361
return Err ( Error :: SessionExpired ) ;
361
362
} else if !self . is_readonly_allowed ( ) && response. readonly {
362
- return Err ( Error :: ConnectionLoss ) ;
363
+ return Err ( Error :: UnexpectedError ( format ! (
364
+ "get readonly session {}, while is not allowed since readonly={}, old session readonly={}" ,
365
+ session_id, self . readonly, self . session. readonly
366
+ ) ) ) ;
363
367
}
364
- let session_id = SessionId ( response. session_id ) ;
365
368
if self . session . id != session_id {
366
369
self . session . id = session_id;
367
370
info ! (
@@ -434,7 +437,7 @@ impl Session {
434
437
} ,
435
438
now = tick. tick( ) => {
436
439
if now >= self . last_recv + self . connector. timeout( ) {
437
- return Err ( Error :: ConnectionLoss ) ;
440
+ return Err ( Error :: new_other ( format! ( "no response from connection in {}ms" , self . connector . timeout ( ) . as_millis ( ) ) , None ) ) ;
438
441
}
439
442
} ,
440
443
}
@@ -462,14 +465,14 @@ impl Session {
462
465
if self . session . readonly { Some ( self . connector . clone ( ) . seek_for_writable ( endpoints) ) } else { None } ;
463
466
let mut tick = time:: interval ( self . tick_timeout ) ;
464
467
tick. set_missed_tick_behavior ( time:: MissedTickBehavior :: Skip ) ;
465
- let mut channel_closed = false ;
468
+ let mut err = None ;
466
469
let mut channel_halted = false ;
467
470
depot. start ( ) ;
468
471
while !( channel_halted && depot. is_empty ( ) && !conn. wants_write ( ) ) {
469
472
select ! {
470
473
Some ( endpoint) = Self :: poll( & mut seek_for_writable) , if seek_for_writable. is_some( ) => {
471
474
seek_for_writable = None ;
472
- debug !( "succeeds to contact writable server {}" , endpoint) ;
475
+ err = Some ( Error :: new_other ( format !( "encounter writable server {}" , endpoint) , None ) ) ;
473
476
channel_halted = true ;
474
477
} ,
475
478
_ = conn. readable( ) => {
@@ -488,7 +491,6 @@ impl Session {
488
491
depot. push_session( SessionOperation :: new_without_body( OpCode :: CloseSession ) ) ;
489
492
}
490
493
channel_halted = true ;
491
- channel_closed = true ;
492
494
continue ;
493
495
} ;
494
496
depot. push_session( operation) ;
@@ -500,7 +502,7 @@ impl Session {
500
502
} ,
501
503
now = tick. tick( ) => {
502
504
if now >= self . last_recv + self . connector. timeout( ) {
503
- return Err ( Error :: ConnectionLoss ) ;
505
+ return Err ( Error :: new_other ( format! ( "no response from connection in {}ms" , self . connector . timeout ( ) . as_millis ( ) ) , None ) ) ;
504
506
}
505
507
if self . last_ping. is_none( ) && now >= self . last_send + self . ping_timeout {
506
508
self . send_ping( depot, now) ;
@@ -509,11 +511,7 @@ impl Session {
509
511
} ,
510
512
}
511
513
}
512
- if channel_closed {
513
- Err ( Error :: ClientClosed )
514
- } else {
515
- Err ( Error :: ConnectionLoss )
516
- }
514
+ Err ( err. unwrap_or ( Error :: ClientClosed ) )
517
515
}
518
516
519
517
fn send_ping ( & mut self , depot : & mut Depot , now : Instant ) {
0 commit comments