Skip to content

Commit

Permalink
add message batch sending
Browse files Browse the repository at this point in the history
  • Loading branch information
tzachshabtay committed Jan 19, 2024
1 parent 3bf2b87 commit 551eee4
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 0 deletions.
15 changes: 15 additions & 0 deletions registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ func SendToTarget(m Messagable, sessionID SessionID) error {
return session.queueForSend(msg)
}

// SendBatchToTarget sends a batch of messages based on the sessionID. This allows for better mutex contention if we're sending large batches of messages repeatedly.
func SendBatchToTarget(messages []Messagable, sessionID SessionID) error {
session, ok := lookupSession(sessionID)
if !ok {
return errUnknownSession
}
msgs := make([]*Message, 0, len(messages))
for _, m := range messages {
msg := m.ToMessage()
msgs = append(msgs, msg)
}

return session.queueBatchForSend(msgs)
}

// ResetSession resets session's sequence numbers.
func ResetSession(sessionID SessionID) error {
session, ok := lookupSession(sessionID)
Expand Down
37 changes: 37 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,20 @@ func (s *session) resend(msg *Message) bool {

// queueForSend will validate, persist, and queue the message for send.
func (s *session) queueForSend(msg *Message) error {
if err := s.queueForSendLocked(msg); err != nil {
return err
}

select {
case s.messageEvent <- true:
default:
}

return nil
}

// queueForSendLocked is the critical section portion of queueForSend.
func (s *session) queueForSendLocked(msg *Message) error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()

Expand All @@ -235,6 +249,13 @@ func (s *session) queueForSend(msg *Message) error {

s.toSend = append(s.toSend, msgBytes)

return nil
}

// queueBatchForSend will validate, persist, and queue a batch of messages for send.
func (s *session) queueBatchForSend(msgs []*Message) error {
s.queueBatchForSendLocked(msgs)

select {
case s.messageEvent <- true:
default:
Expand All @@ -243,6 +264,22 @@ func (s *session) queueForSend(msg *Message) error {
return nil
}

// queueBatchForSendLocked is the critical section portion of queueBatchForSend.
func (s *session) queueBatchForSendLocked(msgs []*Message) error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()

for _, msg := range msgs {
msgBytes, err := s.prepMessageForSend(msg, nil)
if err != nil {
return err
}
s.toSend = append(s.toSend, msgBytes)
}

return nil
}

// send will validate, persist, queue the message. If the session is logged on, send all messages in the queue.
func (s *session) send(msg *Message) error {
return s.sendInReplyTo(msg, nil)
Expand Down
11 changes: 11 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,17 @@ func (suite *SessionSendTestSuite) TestQueueForSendAppMessage() {
suite.NextSenderMsgSeqNum(2)
}

func (suite *SessionSendTestSuite) TestQueueBatchForSendAppMessage() {
suite.MockApp.On("ToApp").Return(nil)
require.Nil(suite.T(), suite.queueBatchForSend([]*Message{suite.NewOrderSingle(), suite.NewOrderSingle(), suite.NewOrderSingle()}))

suite.MockApp.AssertExpectations(suite.T())
suite.NoMessageSent()
suite.MessagePersisted(suite.MockApp.lastToApp)
suite.FieldEquals(tagMsgSeqNum, 3, suite.MockApp.lastToApp.Header)
suite.NextSenderMsgSeqNum(4)
}

func (suite *SessionSendTestSuite) TestQueueForSendDoNotSendAppMessage() {
suite.MockApp.On("ToApp").Return(ErrDoNotSend)
suite.Equal(ErrDoNotSend, suite.queueForSend(suite.NewOrderSingle()))
Expand Down

0 comments on commit 551eee4

Please sign in to comment.