Skip to content

Commit

Permalink
WIP: background-worker changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mdimjasevic committed Jan 24, 2024
1 parent 3696927 commit dcbbd82
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
module Wire.API.Federation.BackendNotifications where

import Control.Exception
import Control.Monad.Codensity
import Control.Monad.Except
import Data.Aeson qualified as A
import Data.Domain
Expand Down Expand Up @@ -63,9 +64,12 @@ data PayloadBundle (c :: Component) = PayloadBundle
deriving (A.ToJSON, A.FromJSON) via (Schema (PayloadBundle c))

instance KnownComponent c => Semigroup (PayloadBundle c) where
-- TODO(md): replace the Semigroup instance by a custom function that can
-- fail.
b1 <> b2 =
PayloadBundle
{ originDomain = b1.originDomain,
-- the assumption is that b2 has the same origin and target domain.
targetDomain = b1.targetDomain,
notifications = notifications b1 <> notifications b2
}
Expand All @@ -80,7 +84,7 @@ instance ToSchema (PayloadBundle c) where

type BackendNotificationAPI = Capture "name" Text :> ReqBody '[JSON] RawJson :> Post '[JSON] EmptyResponse

sendNotification :: FederatorClientEnv -> Component -> Text -> RawJson -> IO (Either FederatorClientError ())
sendNotification :: FederatorClientVersionedEnv -> Component -> Text -> RawJson -> IO (Either FederatorClientError ())
sendNotification env component path body =
case component of
Brig -> go @'Brig
Expand All @@ -93,8 +97,11 @@ sendNotification env component path body =

go :: forall c. (KnownComponent c) => IO (Either FederatorClientError ())
go =
runFederatorClient env . void $
clientIn (Proxy @BackendNotificationAPI) (Proxy @(FederatorClient c)) (withoutFirstSlash path) body
lowerCodensity
. runExceptT
. runVersionedFederatorClientToCodensity env
. void
$ clientIn (Proxy @BackendNotificationAPI) (Proxy @(FederatorClient c)) (withoutFirstSlash path) body

enqueue :: Q.Channel -> RequestId -> Domain -> Domain -> Q.DeliveryMode -> FedQueueClient c a -> IO a
enqueue channel requestId originDomain targetDomain deliveryMode (FedQueueClient action) =
Expand Down
11 changes: 10 additions & 1 deletion libs/wire-api-federation/src/Wire/API/Federation/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,16 @@ instance KnownComponent c => RunClient (FederatorClient c) where
expectedStatuses

v <- asks cveVersion
let vreq = req {requestHeaders = (versionHeader, toByteString' (versionInt (fromMaybe V0 v))) :<| requestHeaders req}
let vreq =
req
{ requestHeaders =
( versionHeader,
toByteString'
( versionInt (fromMaybe V0 v)
)
)
:<| requestHeaders req
}

withHTTP2StreamingRequest successfulStatus vreq $ \resp -> do
bdy <-
Expand Down
82 changes: 69 additions & 13 deletions services/background-worker/src/Wire/BackendNotificationPusher.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import Network.RabbitMqAdmin
import Prometheus
import System.Logger.Class qualified as Log
import UnliftIO
import Wire.API.Federation.API
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client
import Wire.API.Federation.Version
import Wire.BackgroundWorker.Env
import Wire.BackgroundWorker.Options
import Wire.BackgroundWorker.Util
Expand Down Expand Up @@ -70,16 +72,31 @@ pushNotification runningFlag targetDomain (msg, envelope) = do
recovering policy handlers $
const $
-- Ensure that the mvars are reset correctly.
-- Ensure that the mvars are reset correctly.
-- Ensure that the mvars are reset correctly.
-- takeMVar also has the nice feature of being a second layer of protection
-- takeMVar also has the nice feature of being a second layer of protection
-- takeMVar also has the nice feature of being a second layer of protection
-- against lazy thread updates in `amqp`. If this somehow gets called while
-- against lazy thread updates in `amqp`. If this somehow gets called while
-- against lazy thread updates in `amqp`. If this somehow gets called while
-- we are trying to cleanup workers for a shutdown, this will call will block
-- we are trying to cleanup workers for a shutdown, this will call will block
-- we are trying to cleanup workers for a shutdown, this will call will block
-- and prevent the message from being sent out as we are tearing down resources.
-- and prevent the message from being sent out as we are tearing down resources.
-- and prevent the message from being sent out as we are tearing down resources.
-- This removes one way that a message might be delivered twice.
-- This removes one way that a message might be delivered twice.
-- This removes one way that a message might be delivered twice.
UnliftIO.bracket_ (takeMVar runningFlag) (putMVar runningFlag ()) go
where
go :: AppT IO ()
go = case A.eitherDecode @BackendNotification (Q.msgBody msg) of
-- go = case A.eitherDecode @BackendNotification (Q.msgBody msg) of
go = case A.eitherDecode @(PayloadBundle _) (Q.msgBody msg) of
Left e -> do
-- TODO(md): try to parse this as an old 'BackendNotification' to
-- maintain backward compatibility.
Log.err $
Log.msg (Log.val "Failed to parse notification, the notification will be ignored")
. Log.field "domain" (domainText targetDomain)
Expand All @@ -92,18 +109,57 @@ pushNotification runningFlag targetDomain (msg, envelope) = do
-- this message blocks the whole queue. Perhaps there is a better way to
-- deal with this.
lift $ reject envelope False
Right notif -> do
ceFederator <- asks (.federatorInternal)
ceHttp2Manager <- asks http2Manager
let ceOriginDomain = notif.ownDomain
ceTargetDomain = targetDomain
ceOriginRequestId = fromMaybe (RequestId "N/A") notif.requestId
fcEnv = FederatorClientEnv {..}
liftIO $ either throwM pure =<< sendNotification fcEnv notif.targetComponent notif.path notif.body
lift $ ack envelope
metrics <- asks backendNotificationMetrics
withLabel metrics.pushedCounter (domainText targetDomain) incCounter
withLabel metrics.stuckQueuesGauge (domainText targetDomain) (flip setGauge 0)
-- Right notif -> do
Right bundle -> do
federator <- asks (.federatorInternal)
manager <- asks http2Manager
let env =
FederatorClientEnv
{ ceOriginDomain = bundle.originDomain,
ceTargetDomain = bundle.targetDomain,
ceFederator = federator,
ceHttp2Manager = manager,
ceOriginRequestId = RequestId "N/A"
}
remoteVersions :: Set Int <-
liftIO (runFederatorClient env $ fedClient @'Brig @"api-version" ()) >>= \case
Left e -> do
Log.err $
Log.msg (Log.val "Failed to get supported API versions, the notification will be ignored")
. Log.field "domain" (domainText targetDomain)
. Log.field "error" (displayException e)
throwIO e -- TODO(md): do something better here
Right vi -> pure . Set.fromList . fmap versionInt . vinfoSupported $ vi
let mostRecentNotif = foldl' combine Nothing (notifications bundle)
combine ::
Maybe (BackendNotification, Version) ->
BackendNotification ->
Maybe (BackendNotification, Version)
combine greatest notif =
let notifGreatest = bodyVersions notif >>= flip versionInCommon remoteVersions
in case (greatest, notifGreatest) of
(Nothing, Nothing) -> Nothing
(Nothing, Just v) -> Just (notif, v)
(Just (gn, gv), Nothing) -> Just (gn, gv)
(Just (gn, gv), Just v) ->
if v > gv
then Just (notif, v)
else Just (gn, gv)
case mostRecentNotif of
Nothing -> undefined -- TODO(md): log an error as there is no version in common.
Just (notif, Just -> cveVersion) -> do
ceFederator <- asks (.federatorInternal)
ceHttp2Manager <- asks http2Manager
let ceOriginDomain = notif.ownDomain
ceTargetDomain = targetDomain
ceOriginRequestId = fromMaybe (RequestId "N/A") notif.requestId
cveEnv = FederatorClientEnv {..}
fcEnv = FederatorClientVersionedEnv {..}
liftIO $ either throwM pure =<< sendNotification fcEnv notif.targetComponent notif.path notif.body
lift $ ack envelope
metrics <- asks backendNotificationMetrics
withLabel metrics.pushedCounter (domainText targetDomain) incCounter
withLabel metrics.stuckQueuesGauge (domainText targetDomain) (flip setGauge 0)

-- FUTUREWORK: Recosider using 1 channel for many consumers. It shouldn't matter
-- for a handful of remote domains.
Expand Down

0 comments on commit dcbbd82

Please sign in to comment.