Skip to content

Commit

Permalink
Escape send queued when blocked on connection side
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandrosKyriakakis committed Jan 30, 2024
1 parent 3bf2b87 commit bb33c92
Showing 1 changed file with 32 additions and 10 deletions.
42 changes: 32 additions & 10 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,16 @@ func (s *session) queueForSend(msg *Message) error {

s.toSend = append(s.toSend, msgBytes)

s.notifyMessageOut()

return nil
}

func (s *session) notifyMessageOut() {
select {
case s.messageEvent <- true:
default:
}

return nil
}

// send will validate, persist, queue the message. If the session is logged on, send all messages in the queue.
Expand Down Expand Up @@ -347,10 +351,23 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {
}

func (s *session) sendQueued() {
for _, msgBytes := range s.toSend {
s.sendBytes(msgBytes)
var (
blocked bool
indexBlocked int
)

for i, msgBytes := range s.toSend {
blocked = s.sendBytes(msgBytes)
if blocked {
indexBlocked = i
break
}
}
if blocked {
s.toSend = s.toSend[indexBlocked:]
s.notifyMessageOut()
return
}

s.dropQueued()
}

Expand All @@ -366,15 +383,20 @@ func (s *session) EnqueueBytesAndSend(msg []byte) {
s.sendQueued()
}

func (s *session) sendBytes(msg []byte) {
func (s *session) sendBytes(msg []byte) bool {
if s.messageOut == nil {
s.log.OnEventf("Failed to send: disconnected")
return
return false
}

s.log.OnOutgoing(msg)
s.messageOut <- msg
s.stateTimer.Reset(s.HeartBtInt)
select {
case <-time.After(5 * time.Millisecond):
return true
case s.messageOut <- msg:
s.log.OnOutgoing(msg)
s.stateTimer.Reset(s.HeartBtInt)
return false
}
}

func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, err error) {
Expand Down

0 comments on commit bb33c92

Please sign in to comment.