From 8ec85c6c647aa232fccfab3512863344d89b47fd Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Mon, 10 Feb 2025 23:57:00 +0000 Subject: [PATCH 1/2] SMP client: dont block on writing to sending queues --- src/Simplex/Messaging/Client.hs | 8 ++++---- tests/AgentTests/SQLiteTests.hs | 2 -- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index bde663b32..9617d6ad4 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -105,7 +105,7 @@ module Simplex.Messaging.Client where import Control.Applicative ((<|>)) -import Control.Concurrent (ThreadId, forkFinally, killThread, mkWeakThreadId) +import Control.Concurrent (ThreadId, forkFinally, forkIO, killThread, mkWeakThreadId) import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception @@ -1086,11 +1086,11 @@ sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do pure [Response entityId $ Left $ PCETransportError e] TBTransmissions s n rs | n > 0 -> do - atomically $ writeTBQueue sndQ (Nothing, s) -- do not expire batched responses + void $ forkIO $ atomically $ writeTBQueue sndQ (Nothing, s) -- do not expire batched responses mapConcurrently (getResponse c Nothing) rs | otherwise -> pure [] TBTransmission s r -> do - atomically $ writeTBQueue sndQ (Nothing, s) + void $ forkIO $ atomically $ writeTBQueue sndQ (Nothing, s) (: []) <$> getResponse c Nothing r -- | Send Protocol command @@ -1112,7 +1112,7 @@ sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THan Right t | B.length s > blockSize - 2 -> pure . Left $ PCETransportError TELargeMsg | otherwise -> do - atomically $ writeTBQueue sndQ (Just r, s) + void $ forkIO $ atomically $ writeTBQueue sndQ (Just r, s) response <$> getResponse c tOut r where s diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 1d5667eb2..5ebce037a 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -45,8 +45,6 @@ import Simplex.Messaging.Agent.Store.AgentStore import Simplex.Messaging.Agent.Store.SQLite import Simplex.Messaging.Agent.Store.SQLite.Common (DBStore (..), withTransaction') import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB -import Simplex.Messaging.Agent.Store.SQLite.Migrations (appMigrations, getCurrentMigrations) -import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..)) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile (..)) From 2fd5806323709970d1aa255b5c67c60acb8f120b Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Tue, 11 Feb 2025 00:10:02 +0000 Subject: [PATCH 2/2] only fork if full --- src/Simplex/Messaging/Client.hs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 9617d6ad4..b1b5bfa53 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -1086,11 +1086,11 @@ sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do pure [Response entityId $ Left $ PCETransportError e] TBTransmissions s n rs | n > 0 -> do - void $ forkIO $ atomically $ writeTBQueue sndQ (Nothing, s) -- do not expire batched responses + nonBlockingWriteTBQueue sndQ (Nothing, s) -- do not expire batched responses mapConcurrently (getResponse c Nothing) rs | otherwise -> pure [] TBTransmission s r -> do - void $ forkIO $ atomically $ writeTBQueue sndQ (Nothing, s) + nonBlockingWriteTBQueue sndQ (Nothing, s) (: []) <$> getResponse c Nothing r -- | Send Protocol command @@ -1112,13 +1112,18 @@ sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THan Right t | B.length s > blockSize - 2 -> pure . Left $ PCETransportError TELargeMsg | otherwise -> do - void $ forkIO $ atomically $ writeTBQueue sndQ (Just r, s) + nonBlockingWriteTBQueue sndQ (Just r, s) response <$> getResponse c tOut r where s | batch = tEncodeBatch1 t | otherwise = tEncode t +nonBlockingWriteTBQueue :: TBQueue a -> a -> IO () +nonBlockingWriteTBQueue q x = do + sent <- atomically $ ifM (isFullTBQueue q) (pure False) (writeTBQueue q x $> True) + unless sent $ void $ forkIO $ atomically $ writeTBQueue q x + getResponse :: ProtocolClient v err msg -> Maybe Int -> Request err msg -> IO (Response err msg) getResponse ProtocolClient {client_ = PClient {tcpTimeout, timeoutErrorCount}} tOut Request {entityId, pending, responseVar} = do r <- fromMaybe tcpTimeout tOut `timeout` atomically (takeTMVar responseVar)