From df3b1cc0174234742d1f41eb6f6d31c09fb98a7b Mon Sep 17 00:00:00 2001 From: s12f <97083380+s12f@users.noreply.github.com> Date: Fri, 17 Nov 2023 18:26:38 +0800 Subject: [PATCH] fix(kafka): load group and offsets (#1687) --- hstream-kafka/HStream/Kafka/Group/Group.hs | 18 ++++++++++++++++-- .../HStream/Kafka/Group/GroupCoordinator.hs | 18 ++++++++++-------- .../HStream/Kafka/Group/GroupOffsetManager.hs | 5 +++++ 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Group/Group.hs b/hstream-kafka/HStream/Kafka/Group/Group.hs index f791bf338..6c42de49f 100644 --- a/hstream-kafka/HStream/Kafka/Group/Group.hs +++ b/hstream-kafka/HStream/Kafka/Group/Group.hs @@ -143,13 +143,15 @@ data Group -- metastore , metaHandle :: Meta.MetaHandle + + -- + , storedMetadata :: IO.IORef Bool } newGroup :: T.Text -> GroupOffsetManager -> Meta.MetaHandle -> IO Group newGroup group metadataManager metaHandle = do lock <- C.newMVar () state <- IO.newIORef Empty - -- TODO: -1 by default ? groupGenerationId <- IO.newIORef 0 leader <- IO.newIORef Nothing members <- H.new @@ -166,6 +168,8 @@ newGroup group metadataManager metaHandle = do protocolName <- IO.newIORef Nothing supportedProtocols <- IO.newIORef Set.empty + storedMetadata <- IO.newIORef False + return $ Group { lock = lock , groupId = group @@ -191,6 +195,8 @@ newGroup group metadataManager metaHandle = do , supportedProtocols = supportedProtocols , metaHandle = metaHandle + + , storedMetadata = storedMetadata } newGroupFromValue :: CM.GroupMetadataValue -> GroupOffsetManager -> Meta.MetaHandle -> IO Group @@ -199,7 +205,6 @@ newGroupFromValue value metadataManager metaHandle = do state <- IO.newIORef (if V.null value.members then Empty else Stable) - -- TODO: -1 by default ? groupGenerationId <- IO.newIORef value.generationId leader <- IO.newIORef value.leader @@ -215,6 +220,8 @@ newGroupFromValue value metadataManager metaHandle = do supportedProtocols <- IO.newIORef Set.empty + storedMetadata <- IO.newIORef True + let group = Group { lock = lock , groupId = value.groupId @@ -240,6 +247,8 @@ newGroupFromValue value metadataManager metaHandle = do , supportedProtocols = supportedProtocols , metaHandle = metaHandle + + , storedMetadata = storedMetadata } -- add members @@ -785,10 +794,14 @@ commitOffsets group@Group{..} req = do CompletingRebalance -> throw (ErrorCodeException K.REBALANCE_IN_PROGRESS) Dead -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) _ -> do + -- updateLatestHeartbeat -- TODO: udpate heartbeat topics <- Utils.forKaArrayM req.topics $ \K.OffsetCommitRequestTopicV0{..} -> do res <- GOM.storeOffsets metadataManager name partitions return $ K.OffsetCommitResponseTopicV0 {partitions = res, name = name} + Utils.whenIORefEq storedMetadata False $ do + Log.info $ "commited offsets on Empty Group, storing Empty Group:" <> Log.build group.groupId + storeGroup group Map.empty return K.OffsetCommitResponseV0 {topics=topics} validateOffsetcommit :: Group -> K.OffsetCommitRequestV2 -> IO () @@ -863,6 +876,7 @@ storeGroup :: Group -> Map.Map T.Text BS.ByteString -> IO () storeGroup group assignments = do value <- getGroupValue group assignments Meta.upsertMeta @CM.GroupMetadataValue group.groupId value group.metaHandle + IO.atomicWriteIORef group.storedMetadata True getGroupValue :: Group -> Map.Map T.Text BS.ByteString -> IO CM.GroupMetadataValue getGroupValue group assignments = do diff --git a/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs index 01bb4fe61..461ab9773 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs @@ -20,6 +20,7 @@ import qualified HStream.Kafka.Common.Utils as Utils import HStream.Kafka.Group.Group (Group) import qualified HStream.Kafka.Group.Group as G import HStream.Kafka.Group.GroupOffsetManager (mkGroupOffsetManager) +import qualified HStream.Kafka.Group.GroupOffsetManager as GOM import qualified HStream.Logger as Log import qualified HStream.MetaStore.Types as Meta import HStream.Store (LDClient) @@ -52,7 +53,7 @@ instance TM.TaskManager GroupCoordinator where listAllTasks gc = do V.fromList . map CM.groupId <$> Meta.listMeta @CM.GroupMetadataValue gc.metaHandle - loadTaskAsync = loadGroup + loadTaskAsync = loadGroupAndOffsets unloadTaskAsync = unloadGroup @@ -235,23 +236,24 @@ describeGroups gc req = do ------------------- Load/Unload Group ------------------------- -- load group from meta store -loadGroup :: GroupCoordinator -> T.Text -> IO () -loadGroup gc groupId = do +loadGroupAndOffsets :: GroupCoordinator -> T.Text -> IO () +loadGroupAndOffsets gc groupId = do + offsetManager <- mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId + GOM.loadOffsetsFromStorage offsetManager Meta.getMeta @CM.GroupMetadataValue groupId gc.metaHandle >>= \case Nothing -> do Log.warning $ "load group failed, group:" <> Log.build groupId <> " not found in metastore" Just value -> do Log.info $ "loading group from metastore, groupId:" <> Log.build groupId <> ", generationId:" <> Log.build value.generationId - addGroupByValue gc value + addGroupByValue gc value offsetManager -addGroupByValue :: GroupCoordinator -> CM.GroupMetadataValue -> IO () -addGroupByValue gc value = do +addGroupByValue :: GroupCoordinator -> CM.GroupMetadataValue -> GOM.GroupOffsetManager -> IO () +addGroupByValue gc value offsetManager = do C.withMVar gc.groups $ \gs -> do H.lookup gs value.groupId >>= \case Nothing -> do - metadataManager <- mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) value.groupId - ng <- G.newGroupFromValue value metadataManager gc.metaHandle + ng <- G.newGroupFromValue value offsetManager gc.metaHandle H.insert gs value.groupId ng Just _ -> do Log.warning $ "load group failed, group:" <> Log.build value.groupId <> " is loaded" diff --git a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs index 014cf23d6..d8ae8a777 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs @@ -6,6 +6,7 @@ module HStream.Kafka.Group.GroupOffsetManager , storeOffsets , fetchOffsets , fetchAllOffsets + , nullOffsets , loadOffsetsFromStorage ) where @@ -198,6 +199,10 @@ fetchAllOffsets GroupOffsetManager{..} = do foldF tp offset = Map.insertWith (V.++) tp.topicName (V.singleton (makePartition tp.topicPartitionIdx offset)) makeTopic (name, partitions) = K.OffsetFetchResponseTopicV0 {partitions=KaArray (Just partitions), name=name} +nullOffsets :: GroupOffsetManager -> IO Bool +nullOffsets GroupOffsetManager{..} = do + Map.null <$> readIORef offsetsCache + ------------------------------------------------------------------------------------------------- -- helper