Skip to content

Commit

Permalink
Make all fedQueue endpoints use the new client
Browse files Browse the repository at this point in the history
  • Loading branch information
mdimjasevic committed Jan 26, 2024
1 parent 2a7641d commit c10068c
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 50 deletions.
30 changes: 2 additions & 28 deletions libs/wire-api-federation/src/Wire/API/Federation/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ module Wire.API.Federation.API
fedClient,
fedQueueClient,
toBundle,
fedQueueClientBundle,
fedClientIn,
unsafeFedClientIn,
module Wire.API.MakesFederatedCall,
Expand All @@ -51,7 +50,6 @@ import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client
import Wire.API.Federation.Component
import Wire.API.Federation.Endpoint
import Wire.API.Federation.HasNotificationEndpoint
import Wire.API.MakesFederatedCall
import Wire.API.Routes.Named

Expand Down Expand Up @@ -96,11 +94,11 @@ fedClientIn ::
Client m api
fedClientIn = clientIn (Proxy @api) (Proxy @m)

fedQueueClientBundle ::
fedQueueClient ::
KnownComponent c =>
PayloadBundle c ->
FedQueueClient c ()
fedQueueClientBundle bundle = do
fedQueueClient bundle = do
env <- ask
let msg =
newMsg
Expand All @@ -114,30 +112,6 @@ fedQueueClientBundle bundle = do
ensureQueue env.channel env.targetDomain._domainText
void $ publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg

fedQueueClient ::
forall {k} (tag :: k).
( HasNotificationEndpoint tag,
KnownSymbol (NotificationPath tag),
KnownComponent (NotificationComponent k),
ToJSON (Payload tag)
) =>
Payload tag ->
FedQueueClient (NotificationComponent k) ()
fedQueueClient payload = do
env <- ask
let notif = fedNotifToBackendNotif @tag env.requestId env.originDomain payload
msg =
newMsg
{ msgBody = encode notif,
msgDeliveryMode = Just (env.deliveryMode),
msgContentType = Just "application/json"
}
-- Empty string means default exchange
exchange = ""
liftIO $ do
ensureQueue env.channel env.targetDomain._domainText
void $ publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg

-- | Like 'fedClientIn', but doesn't propagate a 'CallsFed' constraint. Intended
-- to be used in test situations only.
unsafeFedClientIn ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ ensureQueue chan queue = do
newtype FedQueueClient c a = FedQueueClient (ReaderT FedQueueEnv IO a)
deriving (Functor, Applicative, Monad, MonadIO, MonadReader FedQueueEnv)

reqOrigin :: FedQueueClient c (RequestId, Domain)
reqOrigin = do
reqId <- asks (.requestId)
origin <- asks (.originDomain)
pure (reqId, origin)

data FedQueueEnv = FedQueueEnv
{ channel :: Q.Channel,
originDomain :: Domain,
Expand Down
5 changes: 3 additions & 2 deletions services/brig/src/Brig/Federation/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ notifyUserDeleted self remotes = do
view rabbitmqChannel >>= \case
Just chanVar -> do
enqueueNotification (tDomain self) remoteDomain Q.Persistent chanVar $
void $
fedQueueClient @'OnUserDeletedConnectionsTag notif
void $ do
(reqId, origin) <- reqOrigin
fedQueueClient $ toBundle @'OnUserDeletedConnectionsTag reqId origin notif
Nothing ->
Log.err $
Log.msg ("Federation error while notifying remote backends of a user deletion." :: ByteString)
Expand Down
5 changes: 2 additions & 3 deletions services/galley/src/Galley/API/Action.hs
Original file line number Diff line number Diff line change
Expand Up @@ -899,9 +899,8 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do
-- itself using the ConversationUpdate returned by this function
if notifyOrigDomain || tDomain ruids /= qDomain quid
then do
reqId <- asks (.requestId)
origin <- asks (.originDomain)
fedQueueClientBundle (toBundle @'OnConversationUpdatedTag reqId origin update) $> Nothing
(reqId, origin) <- reqOrigin
fedQueueClient (toBundle @'OnConversationUpdatedTag reqId origin update) $> Nothing
else pure (Just update)

-- notify local participants and bots
Expand Down
10 changes: 9 additions & 1 deletion services/galley/src/Galley/API/Clients.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import Polysemy.TinyLog qualified as P
import Wire.API.Conversation hiding (Member)
import Wire.API.Federation.API
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.Routes.MultiTablePaging
import Wire.Sem.Paging.Cassandra (CassandraPaging)

Expand Down Expand Up @@ -137,5 +138,12 @@ rmClientH (usr ::: cid) = do
removeRemoteMLSClients :: Range 1 1000 [Remote ConvId] -> Sem r ()
removeRemoteMLSClients convIds = do
for_ (bucketRemote (fromRange convIds)) $ \remoteConvs ->
let rpc = void $ fedQueueClient @'OnClientRemovedTag (ClientRemovedRequest usr cid (tUnqualified remoteConvs))
let rpc = void $ do
(req, origin) <- reqOrigin
fedQueueClient
( toBundle @'OnClientRemovedTag
req
origin
(ClientRemovedRequest usr cid (tUnqualified remoteConvs))
)
in enqueueNotification remoteConvs Q.Persistent rpc
6 changes: 5 additions & 1 deletion services/galley/src/Galley/API/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ import Wire.API.Event.Conversation
import Wire.API.Event.LeaveReason
import Wire.API.Federation.API
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Error
import Wire.API.Provider.Service hiding (Service)
import Wire.API.Routes.API
Expand Down Expand Up @@ -426,7 +427,10 @@ rmUser lusr conn = do
leaveRemoteConversations cids =
for_ (bucketRemote (fromRange cids)) $ \remoteConvs -> do
let userDelete = UserDeletedConversationsNotification (tUnqualified lusr) (unsafeRange (tUnqualified remoteConvs))
let rpc = void $ fedQueueClient @'OnUserDeletedConversationsTag userDelete
let rpc = void $ do
(req, origin) <- reqOrigin
fedQueueClient $
toBundle @'OnUserDeletedConversationsTag req origin userDelete
enqueueNotification remoteConvs Q.Persistent rpc

-- FUTUREWORK: Add a retry mechanism if there are federation errrors.
Expand Down
34 changes: 20 additions & 14 deletions services/galley/src/Galley/API/MLS/Propagate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import Polysemy.TinyLog hiding (trace)
import Wire.API.Event.Conversation
import Wire.API.Federation.API
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.MLS.Credential
import Wire.API.MLS.Message
import Wire.API.MLS.Serialisation
Expand Down Expand Up @@ -88,20 +89,25 @@ propagateMessage qusr mSenderClient lConvOrSub con msg cm = do

-- send to remotes
(either (logRemoteNotificationError @"on-mls-message-sent") (const (pure ())) <=< enqueueNotificationsConcurrently Q.Persistent (map remoteMemberQualify rmems)) $
\rs ->
fedQueueClient @'OnMLSMessageSentTag $
RemoteMLSMessage
{ time = now,
sender = qusr,
metadata = mm,
conversation = qUnqualified qcnv,
subConversation = sconv,
recipients =
Map.fromList $
tUnqualified rs
>>= toList . remoteMemberMLSClients,
message = Base64ByteString msg.raw
}
\rs -> do
(reqId, origin) <- reqOrigin
fedQueueClient $
toBundle @'OnMLSMessageSentTag
reqId
origin
( RemoteMLSMessage
{ time = now,
sender = qusr,
metadata = mm,
conversation = qUnqualified qcnv,
subConversation = sconv,
recipients =
Map.fromList $
tUnqualified rs
>>= toList . remoteMemberMLSClients,
message = Base64ByteString msg.raw
}
)
where
cmWithoutSender = maybe cm (flip cmRemoveClient cm . mkClientIdentity qusr) mSenderClient

Expand Down
5 changes: 4 additions & 1 deletion services/galley/src/Galley/API/Message.hs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import Wire.API.Event.Conversation
import Wire.API.Federation.API
import Wire.API.Federation.API.Brig
import Wire.API.Federation.API.Galley
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Client (FederatorClient)
import Wire.API.Federation.Error
import Wire.API.Message
Expand Down Expand Up @@ -697,7 +698,9 @@ sendRemoteMessages domain now sender senderClient lcnv metadata messages = (hand
transient = mmTransient metadata,
recipients = UserClientMap rcpts
}
let rpc = void $ fedQueueClient @'OnMessageSentTag rm
let rpc = void $ do
(reqId, origin) <- reqOrigin
fedQueueClient $ toBundle @'OnMessageSentTag reqId origin rm
enqueueNotification domain Q.Persistent rpc
where
handle :: Either FederationError a -> Sem r (Set (UserId, ClientId))
Expand Down

0 comments on commit c10068c

Please sign in to comment.