From 61026817e61c38796469bef17cec62d0cf552d29 Mon Sep 17 00:00:00 2001 From: Mitchell Rosen Date: Mon, 27 Nov 2023 21:47:18 -0500 Subject: [PATCH] rename ThreadFailed to Propagating and move it to its own module --- ki/ki.cabal | 1 + ki/src/Ki.hs | 9 ++----- ki/src/Ki/Internal/Propagating.hs | 40 +++++++++++++++++++++++++++++++ ki/src/Ki/Internal/Scope.hs | 17 ++++++------- ki/src/Ki/Internal/Thread.hs | 35 +-------------------------- 5 files changed, 53 insertions(+), 49 deletions(-) create mode 100644 ki/src/Ki/Internal/Propagating.hs diff --git a/ki/ki.cabal b/ki/ki.cabal index 9339db8..2629c34 100644 --- a/ki/ki.cabal +++ b/ki/ki.cabal @@ -91,6 +91,7 @@ library Ki.Internal.ByteCount Ki.Internal.IO Ki.Internal.NonblockingSTM + Ki.Internal.Propagating Ki.Internal.Scope Ki.Internal.Thread Ki.Internal.ThreadAffinity diff --git a/ki/src/Ki.hs b/ki/src/Ki.hs index 4e340e1..1a27f37 100644 --- a/ki/src/Ki.hs +++ b/ki/src/Ki.hs @@ -48,13 +48,8 @@ import Ki.Internal.Scope fork_, scoped, ) -import Ki.Internal.Thread - ( Thread, - ThreadAffinity (..), - ThreadOptions (..), - await, - defaultThreadOptions, - ) +import Ki.Internal.Thread (Thread, ThreadOptions (..), await, defaultThreadOptions) +import Ki.Internal.ThreadAffinity (ThreadAffinity (..)) -- $introduction -- diff --git a/ki/src/Ki/Internal/Propagating.hs b/ki/src/Ki/Internal/Propagating.hs new file mode 100644 index 0000000..8ef0889 --- /dev/null +++ b/ki/src/Ki/Internal/Propagating.hs @@ -0,0 +1,40 @@ +module Ki.Internal.Propagating + ( pattern PropagatingFrom, + Tid, + peelOffPropagating, + propagate, + ) +where + +import Control.Concurrent (ThreadId) +import Control.Exception (Exception (..), SomeException, asyncExceptionFromException, asyncExceptionToException, throwTo) + +-- Internal exception type thrown by a child thread to its parent, if it fails unexpectedly. +data Propagating = Propagating + { childId :: {-# UNPACK #-} !Tid, + exception :: !SomeException + } + deriving stock (Show) + +instance Exception Propagating where + toException = asyncExceptionToException + fromException = asyncExceptionFromException + +pattern PropagatingFrom :: Tid -> SomeException +pattern PropagatingFrom childId <- (fromException -> Just Propagating {childId}) + +-- A unique identifier for a thread within a scope. (Internal type alias) +type Tid = + Int + +-- Peel an outer Propagating layer off of some exception, if there is one. +peelOffPropagating :: SomeException -> SomeException +peelOffPropagating e0 = + case fromException e0 of + Just (Propagating _ e1) -> e1 + Nothing -> e0 + +-- @propagate exception child parent@ propagates @exception@ from @child@ to @parent@. +propagate :: SomeException -> Tid -> ThreadId -> IO () +propagate exception childId parentThreadId = + throwTo parentThreadId Propagating {childId, exception} diff --git a/ki/src/Ki/Internal/Scope.hs b/ki/src/Ki/Internal/Scope.hs index 678cb4f..e06d4d0 100644 --- a/ki/src/Ki/Internal/Scope.hs +++ b/ki/src/Ki/Internal/Scope.hs @@ -49,7 +49,7 @@ import GHC.Conc.Sync (readTVarIO) import GHC.IO (unsafeUnmask) import IntSupply (IntSupply) import qualified IntSupply -import Ki.Internal.ByteCount +import Ki.Internal.ByteCount (byteCountToInt64) import Ki.Internal.IO ( IOResult (..), UnexceptionalIO (..), @@ -60,7 +60,8 @@ import Ki.Internal.IO uninterruptiblyMasked, ) import Ki.Internal.NonblockingSTM -import Ki.Internal.Thread +import Ki.Internal.Propagating (Tid, peelOffPropagating, propagate, pattern PropagatingFrom) +import Ki.Internal.Thread (Thread, ThreadOptions (..), defaultThreadOptions, forkWithAffinity, makeThread) -- | A scope. -- @@ -171,7 +172,7 @@ scoped action = do -- If one of our children propagated an exception to us, then we know it's about to terminate, so we don't bother -- throwing an exception to it. pure case result of - Left (fromException -> Just ThreadFailed {childId}) -> IntMap.Lazy.delete childId livingChildren0 + Left (PropagatingFrom childId) -> IntMap.Lazy.delete childId livingChildren0 _ -> livingChildren0 -- Deliver a ScopeClosing exception to every living child. @@ -190,8 +191,8 @@ scoped action = do -- By now there are three sources of exception: -- - -- 1) A sync or async exception thrown during the callback, captured in `result`. If applicable, we want to unwrap - -- the `ThreadFailed` off of this, which was only used to indicate it came from one of our children. + -- 1) A sync or async exception thrown during the callback, captured in `result`. If applicable, we want to peel + -- the `Propagating` off of this, which was only used to indicate it came from one of our children. -- -- 2) A sync or async exception left for us in `childExceptionVar` by a child that tried to propagate it to us -- directly, but failed (because we killed it concurrently). @@ -201,7 +202,7 @@ scoped action = do -- -- We cannot throw more than one, so throw them in that priority order. case result of - Left exception -> throwIO (unwrapThreadFailed exception) + Left exception -> throwIO (peelOffPropagating exception) Right value -> tryTakeMVar childExceptionVar >>= \case Nothing -> pure value @@ -429,11 +430,11 @@ propagateException :: Scope -> Tid -> SomeException -> UnexceptionalIO () propagateException Scope {childExceptionVar, parentThreadId, statusVar} childId exception = UnexceptionalIO (readTVarIO statusVar) >>= \case Closing -> tryPutChildExceptionVar -- (A) / (B) - _ -> loop -- we know status is Open here + status -> assert (status >= 0) loop -- we know status is Open here where loop :: UnexceptionalIO () loop = - unexceptionalTry (throwTo parentThreadId ThreadFailed {childId, exception}) >>= \case + unexceptionalTry (propagate exception childId parentThreadId) >>= \case Failure IsScopeClosingException -> tryPutChildExceptionVar -- (C) Failure _ -> loop -- (D) Success _ -> pure () diff --git a/ki/src/Ki/Internal/Thread.hs b/ki/src/Ki/Internal/Thread.hs index 4d82eb0..7c369ba 100644 --- a/ki/src/Ki/Internal/Thread.hs +++ b/ki/src/Ki/Internal/Thread.hs @@ -2,25 +2,14 @@ module Ki.Internal.Thread ( Thread, makeThread, await, - Tid, - ThreadAffinity (..), forkWithAffinity, ThreadOptions (..), defaultThreadOptions, - ThreadFailed (..), - unwrapThreadFailed, ) where import Control.Concurrent (ThreadId, forkOS) -import Control.Exception - ( BlockedIndefinitelyOnSTM (..), - Exception (fromException, toException), - MaskingState (..), - SomeException, - asyncExceptionFromException, - asyncExceptionToException, - ) +import Control.Exception (BlockedIndefinitelyOnSTM (..), MaskingState (..)) import GHC.Conc (STM) import Ki.Internal.ByteCount (ByteCount) import Ki.Internal.IO (forkIO, forkOn, tryEitherSTM) @@ -62,10 +51,6 @@ makeThread threadId action = await_ = tryEitherSTM (\BlockedIndefinitelyOnSTM -> action) pure action } --- A unique identifier for a thread within a scope. (Internal type alias) -type Tid = - Int - -- forkIO/forkOn/forkOS, switching on affinity forkWithAffinity :: ThreadAffinity -> IO () -> IO ThreadId forkWithAffinity = \case @@ -129,24 +114,6 @@ defaultThreadOptions = maskingState = Unmasked } --- Internal exception type thrown by a child thread to its parent, if it fails unexpectedly. -data ThreadFailed = ThreadFailed - { childId :: {-# UNPACK #-} !Tid, - exception :: !SomeException - } - deriving stock (Show) - -instance Exception ThreadFailed where - toException = asyncExceptionToException - fromException = asyncExceptionFromException - --- Peel an outer ThreadFailed layer off of some exception, if there is one. -unwrapThreadFailed :: SomeException -> SomeException -unwrapThreadFailed e0 = - case fromException e0 of - Just (ThreadFailed _ e1) -> e1 - Nothing -> e0 - -- | Wait for a thread to terminate. await :: Thread a -> STM a await =