Skip to content

Commit

Permalink
recovering rx flow
Browse files Browse the repository at this point in the history
  • Loading branch information
kazu-yamamoto committed Jun 24, 2024
1 parent 3c064e1 commit 06aa904
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 12 deletions.
21 changes: 12 additions & 9 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ run cconf@ClientConfig{..} conf client = do
clientCore ctx mgr req processResponse = do
strm <- sendRequest ctx mgr scheme authority req
rsp <- getResponse strm
processResponse rsp
x <- processResponse rsp
adjustRxWindow (rxFlow ctx) strm
return x
runClient ctx mgr = do
x <- client (clientCore ctx mgr) $ aux ctx
waitCounter0 mgr
Expand Down Expand Up @@ -205,14 +207,15 @@ sendStreaming Context{..} mgr req sid newstrm strmbdy = do
forkManagedUnmask mgr $ \unmask -> do
decrementedCounter <- newIORef False
let decCounterOnce = do
alreadyDecremented <- atomicModifyIORef decrementedCounter $ \b -> (True, b)
unless alreadyDecremented $ decCounter mgr
let iface = OutBodyIface {
outBodyUnmask = unmask
, outBodyPush = \b -> atomically $ writeTBQueue tbq (StreamingBuilder b Nothing)
, outBodyPushFinal = \b -> atomically $ writeTBQueue tbq (StreamingBuilder b (Just decCounterOnce))
, outBodyFlush = atomically $ writeTBQueue tbq StreamingFlush
}
alreadyDecremented <- atomicModifyIORef decrementedCounter $ \b -> (True, b)
unless alreadyDecremented $ decCounter mgr
let iface =
OutBodyIface
{ outBodyUnmask = unmask
, outBodyPush = \b -> atomically $ writeTBQueue tbq (StreamingBuilder b Nothing)
, outBodyPushFinal = \b -> atomically $ writeTBQueue tbq (StreamingBuilder b (Just decCounterOnce))
, outBodyFlush = atomically $ writeTBQueue tbq StreamingFlush
}
finished = atomically $ writeTBQueue tbq $ StreamingFinished decCounterOnce
incCounter mgr
strmbdy iface `finally` finished
Expand Down
27 changes: 27 additions & 0 deletions Network/HTTP2/H2/Context.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

module Network.HTTP2.H2.Context where

import qualified Data.ByteString as BS
import Data.IORef
import Network.Control
import Network.Socket (SockAddr)
Expand Down Expand Up @@ -211,6 +212,32 @@ closed ctx@Context{oddStreamTable, evenStreamTable} strm@Stream{streamNumber} cc
err :: SomeException
err = toException (closedCodeToError streamNumber cc)

----------------------------------------------------------------

adjustRxWindow :: IORef RxFlow -> Stream -> IO ()
adjustRxWindow refRx Stream{streamRxQ} = do
mq <- readIORef streamRxQ
case mq of
Nothing -> return ()
Just q -> do
len <- readQ q
when (len > 0) $
void $
atomicModifyIORef refRx $
maybeOpenRxWindow len FCTWindowUpdate
where
readQ q = atomically $ loop 0
where
loop total = do
meb <- tryReadTQueue q
case meb of
Just (Right (bs, _)) -> loop (total + BS.length bs)
Just le@(Left _) -> do
-- reserving HTTP2Error
writeTQueue q le
return total
_ -> return total

----------------------------------------------------------------
-- From peer

Expand Down
3 changes: 2 additions & 1 deletion Network/HTTP2/H2/Receiver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,12 @@ processState (Open _ (NoBody tbl@(_, reqvt))) ctx@Context{..} strm@Stream{stream
return False

-- Transition (process2)
processState (Open hcl (HasBody tbl@(_, reqvt))) ctx@Context{..} strm@Stream{streamInput} _streamId = do
processState (Open hcl (HasBody tbl@(_, reqvt))) ctx@Context{..} strm@Stream{streamInput, streamRxQ} _streamId = do
let mcl = fst <$> (getFieldValue tokenContentLength reqvt >>= C8.readInt)
bodyLength <- newIORef 0
tlr <- newIORef Nothing
q <- newTQueueIO
writeIORef streamRxQ $ Just q
setStreamState ctx strm $ Open hcl (Body q mcl bodyLength tlr)
-- FLOW CONTROL: WINDOW_UPDATE 0: recv: announcing my limit properly
-- FLOW CONTROL: WINDOW_UPDATE: recv: announcing my limit properly
Expand Down
2 changes: 2 additions & 0 deletions Network/HTTP2/H2/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ newOddStream sid txwin rxwin =
<*> newEmptyMVar
<*> newTVarIO (newTxFlow txwin)
<*> newIORef (newRxFlow rxwin)
<*> newIORef Nothing

newEvenStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newEvenStream sid txwin rxwin =
Expand All @@ -60,6 +61,7 @@ newEvenStream sid txwin rxwin =
<*> newEmptyMVar
<*> newTVarIO (newTxFlow txwin)
<*> newIORef (newRxFlow rxwin)
<*> newIORef Nothing

----------------------------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions Network/HTTP2/H2/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ data Stream = Stream
, streamInput :: MVar (Either SomeException InpObj) -- Client only
, streamTxFlow :: TVar TxFlow
, streamRxFlow :: IORef RxFlow
, streamRxQ :: IORef (Maybe RxQ)
}

instance Show Stream where
Expand Down
9 changes: 7 additions & 2 deletions Network/HTTP2/Server/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Network.HTTP2.Server.Worker (
) where

import Data.IORef
import Network.Control
import Network.HTTP.Semantics
import Network.HTTP.Semantics.IO
import Network.HTTP.Semantics.Server
Expand All @@ -35,6 +36,7 @@ data WorkerConf a = WorkerConf
, makePushStream :: a -> PushPromise -> IO (StreamId, a)
, mySockAddr :: SockAddr
, peerSockAddr :: SockAddr
, connRxFlow :: IORef RxFlow
}

fromContext :: Context -> WorkerConf Stream
Expand All @@ -56,6 +58,7 @@ fromContext ctx@Context{..} =
return (pid, newstrm)
, mySockAddr = mySockAddr
, peerSockAddr = peerSockAddr
, connRxFlow = rxFlow
}

----------------------------------------------------------------
Expand Down Expand Up @@ -162,7 +165,7 @@ response wc@WorkerConf{..} mgr th tconf strm (Request req) (Response rsp) pps =
(_, reqvt) = inpObjHeaders req

-- | Worker for server applications.
worker :: WorkerConf a -> Manager -> Server -> Action
worker :: WorkerConf Stream -> Manager -> Server -> Action
worker wc@WorkerConf{..} mgr server = do
sinfo <- newStreamInfo
tcont <- newThreadContinue
Expand All @@ -178,7 +181,9 @@ worker wc@WorkerConf{..} mgr server = do
T.resume th
T.tickle th
let aux = Aux th mySockAddr peerSockAddr
server (Request req') aux $ response wc mgr th tcont strm (Request req')
r <- server (Request req') aux $ response wc mgr th tcont strm (Request req')
adjustRxWindow connRxFlow strm
return r
cont1 <- case ex of
Right () -> return True
Left e@(SomeException _)
Expand Down

0 comments on commit 06aa904

Please sign in to comment.