Skip to content

Commit

Permalink
Merge branch 'main' into kafka_consume_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Nov 16, 2023
2 parents be6d449 + 811f8d4 commit 2841f7c
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 83 deletions.
2 changes: 2 additions & 0 deletions common/server/HStream/Common/Server/MetaData.hs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,14 @@ kafkaRqTables :: [Text]
kafkaRqTables =
[ myRootPath @TaskAllocation @RHandle
, myRootPath @GroupMetadataValue @RHandle
, myRootPath @Proto.Timestamp @RHandle
]

kafkaFileTables :: [Text]
kafkaFileTables =
[ myRootPath @TaskAllocation @FHandle
, myRootPath @GroupMetadataValue @FHandle
, myRootPath @Proto.Timestamp @RHandle
]

initKafkaZkPaths :: HasCallStack => ZHandle -> IO ()
Expand Down
177 changes: 96 additions & 81 deletions hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,81 +6,97 @@ module HStream.Kafka.Group.GroupOffsetManager
, storeOffsets
, fetchOffsets
, fetchAllOffsets
, loadOffsetsFromStorage
) where

import Control.Concurrent (MVar, getNumCapabilities,
modifyMVar, modifyMVar_,
newMVar, readMVar,
withMVar)
import Control.Exception (throw)
import Control.Monad (void)
import Data.Hashable
import qualified Data.HashMap.Strict as HM
import Data.Int (Int32, Int64)
import Data.IORef (IORef, modifyIORef',
newIORef, readIORef,
writeIORef)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import Data.Set (Set)
import qualified Data.Set as S
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word (Word64)
import GHC.Generics (Generic)
import System.Clock

import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException))
import HStream.Kafka.Group.OffsetsStore (OffsetStorage (..),
mkCkpOffsetStorage)
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import HStream.Utils (limitedMapConcurrently)
import qualified Kafka.Protocol as K
import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray))
import qualified Kafka.Protocol.Error as K
import Kafka.Protocol.Message (OffsetCommitRequestPartitionV0 (..),
OffsetCommitResponsePartitionV0 (..),
OffsetFetchResponsePartitionV0 (..))

-- NOTE: All operations on the GroupMetadataManager are not concurrency-safe,
-- and the caller needs to ensure concurrency-safety on its own.
data GroupOffsetManager = forall os. OffsetStorage os => GroupOffsetManager
{ serverId :: Int32
, ldClient :: S.LDClient
, groupName :: T.Text
, offsetStorage :: os
, offsetsCache :: MVar (Map.Map TopicPartition Int64)
, partitionsMap :: MVar (HM.HashMap TopicPartition Word64)
, offsetsCache :: IORef (Map.Map TopicPartition Int64)
, partitionsMap :: IORef (Map.Map TopicPartition Word64)
}

-- FIXME: if we create a consumer group with groupName haven been used, call
-- mkCkpOffsetStorage with groupName may lead us to a un-clean ckp-store
mkGroupOffsetManager :: S.LDClient -> Int32 -> T.Text -> IO GroupOffsetManager
mkGroupOffsetManager ldClient serverId groupName = do
offsetsCache <- newMVar Map.empty
partitionsMap <- newMVar HM.empty
offsetsCache <- newIORef Map.empty
partitionsMap <- newIORef Map.empty
offsetStorage <- mkCkpOffsetStorage ldClient groupName
return GroupOffsetManager{..}

loadOffsetsFromStorage :: GroupOffsetManager -> V.Vector T.Text -> IO ()
loadOffsetsFromStorage GroupOffsetManager{..} topicNames = do
concurrentCap <- getNumCapabilities
void $ limitedMapConcurrently (min 8 concurrentCap) load $ V.toList topicNames
loadOffsetsFromStorage :: GroupOffsetManager -> IO ()
loadOffsetsFromStorage GroupOffsetManager{..} = do
Log.info $ "Consumer group " <> Log.build groupName <> " start load offsets from storage"
start <- getTime Monotonic
tpOffsets <- Map.map fromIntegral <$> loadOffsets offsetStorage groupName
let totalPartitions = length tpOffsets
logIds = S.fromList $ Map.keys tpOffsets
topicNumRef <- newIORef 0
tps <- getTopicPartitions logIds [] topicNumRef
totalTopicNum <- readIORef topicNumRef
let partitionMap = Map.fromList tps
offsetsMap = Map.compose tpOffsets partitionMap
Log.info $ "loadOffsets for group " <> Log.build groupName
<> ", partitionMap: " <> Log.build (show partitionMap)
<> ", offsetsMap: " <> Log.build (show offsetsMap)
-- update offsetsCache
modifyIORef' offsetsCache $ return offsetsMap
-- update partitionsMap
modifyIORef' partitionsMap $ return partitionMap
end <- getTime Monotonic
let msDuration = toNanoSecs (end `diffTimeSpec` start) `div` 1000000
Log.info $ "Finish load offsets for consumer group " <> Log.build groupName
<> ", total time " <> Log.build msDuration <> "ms"
<> ", total nums of topics " <> Log.build totalTopicNum
<> ", total nums of partitions " <> Log.build totalPartitions
where
load topicName = do
partitions <- S.listStreamPartitionsOrdered ldClient (S.transToTopicStreamName topicName)
tpOffsets <- loadOffsets offsetStorage topicName

let tpWithLogId = V.zipWith (\(_, logId) idx -> (mkTopicPartition topicName idx, logId)) partitions (V.fromList [0..])
offsetsTuples <- V.forM tpWithLogId $ \(tp, logId) -> do
case Map.lookup logId tpOffsets of
Nothing -> do
-- FIXME: here we found that a partition is belong to a topic, but it doesn't have any offset commit record in
-- __offset_commit topic. find a way to handle this situation
undefined
Just offset -> return (tp, fromIntegral offset)

let partitionMap = HM.fromList . V.toList $ tpWithLogId
let offsetsMap = Map.fromList . V.toList $ offsetsTuples
Log.info $ "loadOffsets for topic " <> Log.build topicName
<> ", partitionMap: " <> Log.build (show partitionMap)
<> ", offsetsMap: " <> Log.build (show offsetsMap)
-- update offsetsCache
modifyMVar_ offsetsCache $ return . Map.union offsetsMap
-- update partitionsMap
modifyMVar_ partitionsMap $ return . HM.union partitionMap
getTopicPartitions :: Set Word64 -> [[(TopicPartition, S.C_LogID)]] -> IORef Int -> IO ([(TopicPartition, S.C_LogID)])
getTopicPartitions lgs res topicNum
| S.null lgs = return $ concat res
| otherwise = do
let lgId = S.elemAt 0 lgs
(streamId, _) <- S.getStreamIdFromLogId ldClient lgId
modifyIORef' topicNum (+1)
partitions <- V.toList <$> S.listStreamPartitionsOrdered ldClient streamId
let topicName = T.pack $ S.showStreamName streamId
tpWithLogId = zipWith (\(_, logId) idx -> (mkTopicPartition topicName idx, logId)) partitions ([0..])
res' = tpWithLogId : res
-- remove partition ids from lgs because they all have same streamId
lgs' = lgs S.\\ S.fromList (map snd partitions)
getTopicPartitions lgs' res' topicNum

storeOffsets
:: GroupOffsetManager
Expand All @@ -98,14 +114,14 @@ storeOffsets gmm@GroupOffsetManager{..} topicName arrayOffsets = do

-- write checkpoints
let checkPoints = V.foldl' (\acc (_, logId, offset) -> Map.insert logId offset acc) Map.empty offsetsInfo
commitOffsets offsetStorage topicName checkPoints
commitOffsets offsetStorage groupName checkPoints
Log.debug $ "consumer group " <> Log.build groupName <> " commit offsets {" <> Log.build (show checkPoints)
<> "} to topic " <> Log.build topicName

-- update cache
modifyMVar_ offsetsCache $ \cache -> do
modifyIORef' offsetsCache $ \cache -> do
let updates = V.foldl' (\acc (key, _, offset) -> Map.insert key (fromIntegral offset) acc) Map.empty offsetsInfo
return $ Map.union updates cache
Map.union updates cache

let suc = V.map (\(TopicPartition{topicPartitionIdx}, _, _) -> (topicPartitionIdx, K.NONE)) offsetsInfo
res = V.map (\(partitionIndex, errorCode) -> OffsetCommitResponsePartitionV0{..}) suc
Expand All @@ -119,25 +135,26 @@ getOffsetsInfo
getOffsetsInfo GroupOffsetManager{..} topicName requestOffsets = do
V.forM requestOffsets $ \OffsetCommitRequestPartitionV0{..} -> do
let tp = mkTopicPartition topicName partitionIndex
modifyMVar partitionsMap $ \mp -> do
case HM.lookup tp mp of
Nothing -> do
Log.info $ "can't find topic-partition " <> Log.build (show tp) <> " in partitionsMap: " <> Log.build (show mp)
-- read partitions and build partitionsMap
partitions <- S.listStreamPartitionsOrdered ldClient (S.transToTopicStreamName topicName)
Log.info $ "list all partitions for topic " <> Log.build topicName <> ": " <> Log.build (show partitions)
case partitions V.!? (fromIntegral partitionIndex) of
Nothing -> do
Log.info $ "consumer group " <> Log.build groupName <> " receive OffsetCommitRequestPartition with unknown topic or partion"
<> ", topic name: " <> Log.build topicName
<> ", partition: " <> Log.build partitionIndex
throw (ErrorCodeException K.UNKNOWN_TOPIC_OR_PARTITION)
-- ^ TODO: better response(and exception)
Just (_, logId) -> do
let partitionMap = HM.fromList . V.toList $ V.zipWith (\idx (_, lgId) -> ((mkTopicPartition topicName idx), lgId)) (V.fromList [0..]) partitions
mp' = HM.union partitionMap mp
return (mp', (tp, logId, fromIntegral committedOffset))
Just logId -> return (mp, (tp, logId, fromIntegral committedOffset))
mp <- readIORef partitionsMap
case Map.lookup tp mp of
Nothing -> do
Log.info $ "can't find topic-partition " <> Log.build (show tp) <> " in partitionsMap: " <> Log.build (show mp)
-- read partitions and build partitionsMap
partitions <- S.listStreamPartitionsOrdered ldClient (S.transToTopicStreamName topicName)
Log.info $ "list all partitions for topic " <> Log.build topicName <> ": " <> Log.build (show partitions)
case partitions V.!? (fromIntegral partitionIndex) of
Nothing -> do
Log.info $ "consumer group " <> Log.build groupName <> " receive OffsetCommitRequestPartition with unknown topic or partion"
<> ", topic name: " <> Log.build topicName
<> ", partition: " <> Log.build partitionIndex
throw (ErrorCodeException K.UNKNOWN_TOPIC_OR_PARTITION)
-- ^ TODO: better response(and exception)
Just (_, logId) -> do
let partitionMap = Map.fromList . V.toList $ V.zipWith (\idx (_, lgId) -> ((mkTopicPartition topicName idx), lgId)) (V.fromList [0..]) partitions
mp' = Map.union partitionMap mp
writeIORef partitionsMap mp'
return (tp, logId, fromIntegral committedOffset)
Just logId -> return (tp, logId, fromIntegral committedOffset)

fetchOffsets
:: GroupOffsetManager
Expand All @@ -146,33 +163,31 @@ fetchOffsets
-> IO (KaArray OffsetFetchResponsePartitionV0)
fetchOffsets GroupOffsetManager{..} topicName partitions = do
let partitions' = fromMaybe V.empty (unKaArray partitions)
res <- withMVar offsetsCache $ \cache -> do
traverse
(
\ partitionIdx -> do
let key = mkTopicPartition topicName partitionIdx
in case Map.lookup key cache of
Just offset -> return $ OffsetFetchResponsePartitionV0
{ committedOffset = offset
, metadata = Nothing
, partitionIndex= partitionIdx
, errorCode = K.NONE
}
Nothing -> return $ OffsetFetchResponsePartitionV0
{ committedOffset = -1
, metadata = Nothing
, partitionIndex= partitionIdx
-- TODO: check the error code here
, errorCode = K.NONE
}
) partitions'

cache <- readIORef offsetsCache
res <- traverse (getOffset cache) partitions'
return $ KaArray {unKaArray = Just res}
where
getOffset cache partitionIdx = do
let key = mkTopicPartition topicName partitionIdx
in case Map.lookup key cache of
Just offset -> return $ OffsetFetchResponsePartitionV0
{ committedOffset = offset
, metadata = Nothing
, partitionIndex= partitionIdx
, errorCode = K.NONE
}
Nothing -> return $ OffsetFetchResponsePartitionV0
{ committedOffset = -1
, metadata = Nothing
, partitionIndex= partitionIdx
-- TODO: check the error code here
, errorCode = K.NONE
}

fetchAllOffsets :: GroupOffsetManager -> IO (KaArray K.OffsetFetchResponseTopicV0)
fetchAllOffsets GroupOffsetManager{..} = do
-- group offsets by TopicName
cachedOffset <- Map.foldrWithKey foldF Map.empty <$> readMVar offsetsCache
cachedOffset <- Map.foldrWithKey foldF Map.empty <$> readIORef offsetsCache
return . KaArray . Just . V.map makeTopic . V.fromList . Map.toList $ cachedOffset
where makePartition partition offset = OffsetFetchResponsePartitionV0
{ committedOffset = offset
Expand Down
1 change: 1 addition & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ library
, yaml
, Z-Data
, zoovisitor
, clock

default-language: GHC2021
default-extensions:
Expand Down
12 changes: 10 additions & 2 deletions hstream/app/kafka-server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar,
readMVar)
import Control.Exception (handle)
import Control.Exception (Handler (Handler), catches)
import Control.Monad (forM, forM_, void)
import qualified Data.Map as Map
import Data.Maybe (isJust)
Expand Down Expand Up @@ -158,7 +158,10 @@ serve sc@ServerContext{..} netOpts = do
Gossip.Gossip -> return ()
_ -> do
getProtoTimestamp >>= \x -> upsertMeta @Proto.Timestamp M.clusterStartTimeId x metaHandle
handle (\(_ :: HE.RQLiteRowNotFound) -> return ()) $ deleteAllMeta @M.TaskAllocation metaHandle
-- FIXME: Why need to call deleteAll here?
-- Also in CI, getRqResult(common/hstream/HStream/MetaStore/RqliteUtils.hs) may throw a RQLiteUnspecifiedErr
-- because the affected rows are more than 1, why that's invalid ?
deleteAllMeta @M.TaskAllocation metaHandle `catches` exceptionHandlers

Log.info "starting task detector"
TM.runTaskDetector $ TM.TaskDetector {
Expand All @@ -178,6 +181,11 @@ serve sc@ServerContext{..} netOpts = do
<> (if isJust (K.serverSaslOptions netOpts') then "SASL " else "")
<> "kafka server..."
K.runServer netOpts' sc K.unAuthedHandlers K.handlers
where
exceptionHandlers =
[ Handler $ \(_ :: HE.RQLiteRowNotFound) -> return ()
, Handler $ \(_ :: HE.RQLiteUnspecifiedErr) -> return ()
]

serveListeners
:: ServerContext
Expand Down

0 comments on commit 2841f7c

Please sign in to comment.