Skip to content

Commit

Permalink
preparing DynaNext in client or worker to simplify code and types
Browse files Browse the repository at this point in the history
  • Loading branch information
kazu-yamamoto committed Jul 3, 2024
1 parent 78c9fc1 commit b948b49
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 150 deletions.
55 changes: 39 additions & 16 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ run cconf@ClientConfig{..} conf client = do
{ auxPossibleClientStreams = possibleClientStream ctx
}
clientCore ctx req processResponse = do
strm <- sendRequest ctx scheme authority req
strm <- sendRequest conf ctx scheme authority req
rsp <- getResponse strm
x <- processResponse rsp
adjustRxWindow ctx strm
Expand All @@ -103,7 +103,7 @@ runIO cconf@ClientConfig{..} conf@Config{..} action = do
ctx@Context{..} <- setup cconf conf
let putB bs = enqueueControl controlQ $ CFrames Nothing [bs]
putR req = do
strm <- sendRequest ctx scheme authority req
strm <- sendRequest conf ctx scheme authority req
return (streamNumber strm, strm)
get = getResponse
create = openOddStreamWait ctx
Expand Down Expand Up @@ -151,12 +151,13 @@ runH2 conf ctx runClient = do
runBackgroundThreads = concurrently_ runReceiver runSender

sendRequest
:: Context
:: Config
-> Context
-> Scheme
-> Authority
-> Request
-> IO Stream
sendRequest ctx@Context{..} scheme auth (Request req) = do
sendRequest conf ctx@Context{..} scheme auth (Request req) = do
-- Checking push promises
let hdr0 = outObjHeaders req
method = fromMaybe (error "sendRequest:method") $ lookup ":method" hdr0
Expand All @@ -181,29 +182,51 @@ sendRequest ctx@Context{..} scheme auth (Request req) = do
req' = req{outObjHeaders = hdr2}
-- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
(sid, newstrm) <- openOddStreamWait ctx
sendHeaderBody ctx sid newstrm req'
sendHeaderBody conf ctx sid newstrm req'
return newstrm

sendHeaderBody :: Context -> StreamId -> Stream -> OutObj -> IO ()
sendHeaderBody ctx@Context{..} sid newstrm req = do
mtbq <- case outObjBody req of
OutBodyStreaming strmbdy ->
sendStreaming ctx $ \iface ->
sendHeaderBody :: Config -> Context -> StreamId -> Stream -> OutObj -> IO ()
sendHeaderBody Config{..} ctx@Context{..} sid newstrm OutObj{..} = do
(mnext, mtbq) <- case outObjBody of
OutBodyNone -> return (Nothing, Nothing)
OutBodyFile (FileSpec path fileoff bytecount) -> do
(pread, closerOrRefresher) <- confPositionReadMaker path
refresh <- case closerOrRefresher of
Closer closer -> timeoutClose threadManager closer
Refresher refresher -> return refresher
let next = fillFileBodyGetNext pread fileoff bytecount refresh
return (Just next, Nothing)
OutBodyBuilder builder -> do
let next = fillBuilderBodyGetNext builder
return (Just next, Nothing)
OutBodyStreaming strmbdy -> do
q <- sendStreaming ctx $ \iface ->
outBodyUnmask iface $ strmbdy (outBodyPush iface) (outBodyFlush iface)
OutBodyStreamingUnmask strmbdy ->
sendStreaming ctx strmbdy
_ -> return Nothing
let next = nextForStreaming q
return (Just next, Just q)
OutBodyStreamingUnmask strmbdy -> do
q <- sendStreaming ctx strmbdy
let next = nextForStreaming q
return (Just next, Just q)
atomically $ do
sidOK <- readTVar outputQStreamID
check (sidOK == sid)
writeTVar outputQStreamID (sid + 2)
forkManaged threadManager "H2 client send" $
syncWithSender ctx newstrm (OObj req) mtbq
syncWithSender ctx newstrm (OHeader outObjHeaders mnext outObjTrailers) mtbq
where
nextForStreaming
:: TBQueue StreamingChunk
-> DynaNext
nextForStreaming tbq =
let takeQ = atomically $ tryReadTBQueue tbq
next = fillStreamBodyGetNext takeQ
in next

sendStreaming
:: Context
-> (OutBodyIface -> IO ())
-> IO (Maybe (TBQueue StreamingChunk))
-> IO (TBQueue StreamingChunk)
sendStreaming Context{..} strmbdy = do
tbq <- newTBQueueIO 10 -- fixme: hard coding: 10
forkManagedUnmask threadManager "H2 client sendStreaming" $ \unmask -> do
Expand All @@ -221,7 +244,7 @@ sendStreaming Context{..} strmbdy = do
finished = atomically $ writeTBQueue tbq $ StreamingFinished decCounterOnce
incCounter threadManager
strmbdy iface `finally` finished
return $ Just tbq
return tbq

exchangeSettings :: Context -> IO ()
exchangeSettings Context{..} = do
Expand Down
2 changes: 2 additions & 0 deletions Network/HTTP2/H2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Network.HTTP2.H2 (
module Network.HTTP2.H2.Settings,
module Network.HTTP2.H2.Stream,
module Network.HTTP2.H2.StreamTable,
module Network.HTTP2.H2.Sync,
module Network.HTTP2.H2.Types,
module Network.HTTP2.H2.Window,
) where
Expand All @@ -25,5 +26,6 @@ import Network.HTTP2.H2.Sender
import Network.HTTP2.H2.Settings
import Network.HTTP2.H2.Stream
import Network.HTTP2.H2.StreamTable
import Network.HTTP2.H2.Sync
import Network.HTTP2.H2.Types
import Network.HTTP2.H2.Window
23 changes: 0 additions & 23 deletions Network/HTTP2/H2/Context.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

module Network.HTTP2.H2.Context where

import Control.Concurrent
import Data.IORef
import Network.Control
import Network.HTTP.Semantics.IO
import Network.Socket (SockAddr)
import qualified System.TimeManager as T
import UnliftIO.Exception
Expand All @@ -18,7 +16,6 @@ import Imports hiding (insert)
import Network.HPACK
import Network.HTTP2.Frame
import Network.HTTP2.H2.Manager
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Settings
import Network.HTTP2.H2.Stream
import Network.HTTP2.H2.StreamTable
Expand Down Expand Up @@ -306,23 +303,3 @@ openEvenStreamWait ctx@Context{..} = do
newstrm <- newEvenStream sid txws rxws
insertEven' evenStreamTable sid newstrm
return (sid, newstrm)

syncWithSender
:: Context
-> Stream
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
syncWithSender Context{..} strm otyp mtbq = do
var <- newEmptyMVar
enqueueOutput outputQ $ Output strm otyp mtbq (putMVar var)
loop var
where
loop var = do
s <- takeMVar var
case s of
Done -> return ()
Cont wait newotyp -> do
wait
enqueueOutput outputQ $ Output strm newotyp mtbq (putMVar var)
loop var
105 changes: 24 additions & 81 deletions Network/HTTP2/H2/Sender.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import Network.HTTP2.Frame
import Network.HTTP2.H2.Context
import Network.HTTP2.H2.EncodeFrame
import Network.HTTP2.H2.HPACK
import Network.HTTP2.H2.Manager hiding (start)
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Settings
import Network.HTTP2.H2.Stream
Expand All @@ -33,12 +32,6 @@ import Network.HTTP2.H2.Window

----------------------------------------------------------------

{-# INLINE waitStreaming #-}
waitStreaming :: TBQueue a -> IO ()
waitStreaming tbq = atomically $ do
isEmpty <- isEmptyTBQueue tbq
checkSTM (not isEmpty)

data Switch
= C Control
| O Output
Expand Down Expand Up @@ -72,7 +65,7 @@ updatePeerSettings Context{peerSettings, oddStreamTable, evenStreamTable} peerAl

frameSender :: Context -> Config -> IO ()
frameSender
ctx@Context{outputQ, controlQ, encodeDynamicTable, outputBufferLimit, threadManager}
ctx@Context{outputQ, controlQ, encodeDynamicTable, outputBufferLimit}
Config{..} = do
labelMe "fromSender"
loop 0 `E.catch` wrapException
Expand Down Expand Up @@ -152,26 +145,24 @@ frameSender

----------------------------------------------------------------
outputOrEnqueueAgain :: Output -> Offset -> IO Offset
outputOrEnqueueAgain out@(Output strm otyp mtbq sync) off = E.handle resetStream $ do
outputOrEnqueueAgain out@(Output strm otyp sync) off = E.handle resetStream $ do
state <- readStreamState strm
if isHalfClosedLocal state
then return off
else case otyp of
OObj obj ->
OHeader hdr mnext tlrmkr ->
-- Send headers immediately, without waiting for data
-- No need to check the streaming window (applies to DATA frames only)
outputHeader strm obj mtbq sync off
outputHeader strm hdr mnext tlrmkr sync off
_ -> do
mwait <- checkOpen strm mtbq
case mwait of
Just wait -> do
sync $ Cont wait otyp
return off
Nothing -> do
ok <- sync $ Just otyp
if ok
then do
sws <- getStreamWindowSize strm
cws <- getConnectionWindowSize ctx -- not 0
let lim = min cws sws
output out off lim
else return off
where
resetStream e = do
closed ctx strm (ResetByMe e)
Expand All @@ -182,48 +173,31 @@ frameSender
----------------------------------------------------------------
outputHeader
:: Stream
-> OutObj
-> Maybe (TBQueue StreamingChunk)
-> (Sync -> IO ())
-> [Header]
-> Maybe DynaNext
-> TrailersMaker
-> (Maybe OutputType -> IO Bool)
-> Offset
-> IO Offset
outputHeader strm (OutObj hdr body tlrmkr) mtbq sync off0 = do
outputHeader strm hdr mnext tlrmkr sync off0 = do
-- Header frame and Continuation frame
let sid = streamNumber strm
endOfStream = case body of
OutBodyNone -> True
_ -> False
endOfStream = isNothing mnext
(ths, _) <- toTokenHeaderTable $ fixHeaders hdr
off' <- headerContinue sid ths endOfStream off0
-- halfClosedLocal calls closed which removes
-- the stream from stream table.
when endOfStream $ halfClosedLocal ctx strm Finished
off <- flushIfNecessary off'
let setOutputType otyp = Output strm otyp mtbq sync
case body of
OutBodyNone -> return off
OutBodyFile (FileSpec path fileoff bytecount) -> do
(pread, closerOrRefresher) <- confPositionReadMaker path
refresh <- case closerOrRefresher of
Closer closer -> timeoutClose threadManager closer
Refresher refresher -> return refresher
let next = fillFileBodyGetNext pread fileoff bytecount refresh
out' = setOutputType $ ONext next tlrmkr
outputOrEnqueueAgain out' off
OutBodyBuilder builder -> do
let next = fillBuilderBodyGetNext builder
out' = setOutputType $ ONext next tlrmkr
outputOrEnqueueAgain out' off
OutBodyStreaming _ -> do
let out' = setOutputType $ nextForStreaming mtbq tlrmkr
outputOrEnqueueAgain out' off
OutBodyStreamingUnmask _ -> do
let out' = setOutputType $ nextForStreaming mtbq tlrmkr
case mnext of
Nothing -> return off
Just next -> do
let out' = Output strm (ONext next tlrmkr) sync
outputOrEnqueueAgain out' off

----------------------------------------------------------------
output :: Output -> Offset -> WindowSize -> IO Offset
output out@(Output strm (ONext curr tlrmkr) _ sync) off0 lim = do
output out@(Output strm (ONext curr tlrmkr) sync) off0 lim = do
-- Data frame payload
buflim <- readIORef outputBufferLimit
let payloadOff = off0 + frameHeaderLength
Expand All @@ -240,28 +214,15 @@ frameSender
sync
out
reqflush
output (Output strm (OPush ths pid) _ sync) off0 _lim = do
output (Output strm (OPush ths pid) sync) off0 _lim = do
-- Creating a push promise header
-- Frame id should be associated stream id from the client.
let sid = streamNumber strm
len <- pushPromise pid sid ths off0
off <- flushIfNecessary $ off0 + frameHeaderLength + len
sync Done
_ <- sync Nothing
return off
output (Output strm (OObj obj) mtbq sync) off0 _lim = do
-- fixme: never reach?
outputHeader strm obj mtbq sync off0

----------------------------------------------------------------
nextForStreaming
:: Maybe (TBQueue StreamingChunk)
-> TrailersMaker
-> OutputType
nextForStreaming mtbq tlrmkr =
let tbq = fromJust mtbq
takeQ = atomically $ tryReadTBQueue tbq
next = fillStreamBodyGetNext takeQ
in ONext next tlrmkr
output _ _ _ = undefined -- never reached

----------------------------------------------------------------
headerContinue :: StreamId -> TokenHeaderList -> Bool -> Offset -> IO Offset
Expand Down Expand Up @@ -310,7 +271,7 @@ frameSender
-> Int
-> Maybe DynaNext
-> (Maybe ByteString -> IO NextTrailersMaker)
-> (Sync -> IO ())
-> (Maybe OutputType -> IO Bool)
-> Output
-> Bool
-> IO Offset
Expand All @@ -332,7 +293,7 @@ frameSender
else return (Just trailers, defaultFlags)
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
off'' <- handleTrailers mtrailers off'
sync Done
_ <- sync Nothing
halfClosedLocal ctx strm Finished
decreaseWindowSize ctx strm datPayloadLen
if reqflush
Expand Down Expand Up @@ -410,21 +371,3 @@ frameSender
, flags = flag
, streamId = sid
}

checkOpen :: Stream -> Maybe (TBQueue a) -> IO (Maybe (IO ()))
checkOpen strm mtbq = case mtbq of
Nothing -> checkStreamWindowSize
Just tbq -> checkStreaming tbq
where
checkStreaming tbq = do
isEmpty <- atomically $ isEmptyTBQueue tbq
if isEmpty
then do
return $ Just (waitStreaming tbq)
else checkStreamWindowSize
-- FLOW CONTROL: WINDOW_UPDATE: send: respecting peer's limit
checkStreamWindowSize = do
sws <- getStreamWindowSize strm
if sws <= 0
then return $ Just (waitStreamWindowSize strm)
else return Nothing
Loading

0 comments on commit b948b49

Please sign in to comment.