Skip to content

Commit

Permalink
Merge branch 'main' into kafka-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Nov 17, 2023
2 parents 4c54b03 + df3b1cc commit 65fe839
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 10 deletions.
18 changes: 16 additions & 2 deletions hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ data Group

-- metastore
, metaHandle :: Meta.MetaHandle

--
, storedMetadata :: IO.IORef Bool
}

newGroup :: T.Text -> GroupOffsetManager -> Meta.MetaHandle -> IO Group
newGroup group metadataManager metaHandle = do
lock <- C.newMVar ()
state <- IO.newIORef Empty
-- TODO: -1 by default ?
groupGenerationId <- IO.newIORef 0
leader <- IO.newIORef Nothing
members <- H.new
Expand All @@ -166,6 +168,8 @@ newGroup group metadataManager metaHandle = do
protocolName <- IO.newIORef Nothing
supportedProtocols <- IO.newIORef Set.empty

storedMetadata <- IO.newIORef False

return $ Group
{ lock = lock
, groupId = group
Expand All @@ -191,6 +195,8 @@ newGroup group metadataManager metaHandle = do
, supportedProtocols = supportedProtocols

, metaHandle = metaHandle

, storedMetadata = storedMetadata
}

newGroupFromValue :: CM.GroupMetadataValue -> GroupOffsetManager -> Meta.MetaHandle -> IO Group
Expand All @@ -199,7 +205,6 @@ newGroupFromValue value metadataManager metaHandle = do

state <- IO.newIORef (if V.null value.members then Empty else Stable)

-- TODO: -1 by default ?
groupGenerationId <- IO.newIORef value.generationId
leader <- IO.newIORef value.leader

Expand All @@ -215,6 +220,8 @@ newGroupFromValue value metadataManager metaHandle = do

supportedProtocols <- IO.newIORef Set.empty

storedMetadata <- IO.newIORef True

let group = Group
{ lock = lock
, groupId = value.groupId
Expand All @@ -240,6 +247,8 @@ newGroupFromValue value metadataManager metaHandle = do
, supportedProtocols = supportedProtocols

, metaHandle = metaHandle

, storedMetadata = storedMetadata
}

-- add members
Expand Down Expand Up @@ -785,10 +794,14 @@ commitOffsets group@Group{..} req = do
CompletingRebalance -> throw (ErrorCodeException K.REBALANCE_IN_PROGRESS)
Dead -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID)
_ -> do
-- updateLatestHeartbeat
-- TODO: udpate heartbeat
topics <- Utils.forKaArrayM req.topics $ \K.OffsetCommitRequestTopicV0{..} -> do
res <- GOM.storeOffsets metadataManager name partitions
return $ K.OffsetCommitResponseTopicV0 {partitions = res, name = name}
Utils.whenIORefEq storedMetadata False $ do
Log.info $ "commited offsets on Empty Group, storing Empty Group:" <> Log.build group.groupId
storeGroup group Map.empty
return K.OffsetCommitResponseV0 {topics=topics}

validateOffsetcommit :: Group -> K.OffsetCommitRequestV2 -> IO ()
Expand Down Expand Up @@ -863,6 +876,7 @@ storeGroup :: Group -> Map.Map T.Text BS.ByteString -> IO ()
storeGroup group assignments = do
value <- getGroupValue group assignments
Meta.upsertMeta @CM.GroupMetadataValue group.groupId value group.metaHandle
IO.atomicWriteIORef group.storedMetadata True

getGroupValue :: Group -> Map.Map T.Text BS.ByteString -> IO CM.GroupMetadataValue
getGroupValue group assignments = do
Expand Down
18 changes: 10 additions & 8 deletions hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import qualified HStream.Kafka.Common.Utils as Utils
import HStream.Kafka.Group.Group (Group)
import qualified HStream.Kafka.Group.Group as G
import HStream.Kafka.Group.GroupOffsetManager (mkGroupOffsetManager)
import qualified HStream.Kafka.Group.GroupOffsetManager as GOM
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as Meta
import HStream.Store (LDClient)
Expand Down Expand Up @@ -52,7 +53,7 @@ instance TM.TaskManager GroupCoordinator where
listAllTasks gc = do
V.fromList . map CM.groupId <$> Meta.listMeta @CM.GroupMetadataValue gc.metaHandle

loadTaskAsync = loadGroup
loadTaskAsync = loadGroupAndOffsets

unloadTaskAsync = unloadGroup

Expand Down Expand Up @@ -235,23 +236,24 @@ describeGroups gc req = do

------------------- Load/Unload Group -------------------------
-- load group from meta store
loadGroup :: GroupCoordinator -> T.Text -> IO ()
loadGroup gc groupId = do
loadGroupAndOffsets :: GroupCoordinator -> T.Text -> IO ()
loadGroupAndOffsets gc groupId = do
offsetManager <- mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId
GOM.loadOffsetsFromStorage offsetManager
Meta.getMeta @CM.GroupMetadataValue groupId gc.metaHandle >>= \case
Nothing -> do
Log.warning $ "load group failed, group:" <> Log.build groupId <> " not found in metastore"
Just value -> do
Log.info $ "loading group from metastore, groupId:" <> Log.build groupId
<> ", generationId:" <> Log.build value.generationId
addGroupByValue gc value
addGroupByValue gc value offsetManager

addGroupByValue :: GroupCoordinator -> CM.GroupMetadataValue -> IO ()
addGroupByValue gc value = do
addGroupByValue :: GroupCoordinator -> CM.GroupMetadataValue -> GOM.GroupOffsetManager -> IO ()
addGroupByValue gc value offsetManager = do
C.withMVar gc.groups $ \gs -> do
H.lookup gs value.groupId >>= \case
Nothing -> do
metadataManager <- mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) value.groupId
ng <- G.newGroupFromValue value metadataManager gc.metaHandle
ng <- G.newGroupFromValue value offsetManager gc.metaHandle
H.insert gs value.groupId ng
Just _ -> do
Log.warning $ "load group failed, group:" <> Log.build value.groupId <> " is loaded"
Expand Down
5 changes: 5 additions & 0 deletions hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module HStream.Kafka.Group.GroupOffsetManager
, storeOffsets
, fetchOffsets
, fetchAllOffsets
, nullOffsets
, loadOffsetsFromStorage
) where

Expand Down Expand Up @@ -204,6 +205,10 @@ fetchAllOffsets GroupOffsetManager{..} = do
foldF tp offset = Map.insertWith (V.++) tp.topicName (V.singleton (makePartition tp.topicPartitionIdx offset))
makeTopic (name, partitions) = K.OffsetFetchResponseTopicV0 {partitions=KaArray (Just partitions), name=name}

nullOffsets :: GroupOffsetManager -> IO Bool
nullOffsets GroupOffsetManager{..} = do
Map.null <$> readIORef offsetsCache

-------------------------------------------------------------------------------------------------
-- helper

Expand Down

0 comments on commit 65fe839

Please sign in to comment.