From 588bd925521bbba503b01e0298326a544b0acea1 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Thu, 25 Jan 2024 16:08:20 +0800 Subject: [PATCH] Re: FetchHandler Try to cache reader state --- .../HStream/Kafka/Common/FetchManager.hs | 75 +++ .../HStream/Kafka/Server/Handler/Consume.hs | 497 ++++++++++++------ hstream-kafka/HStream/Kafka/Server/Types.hs | 29 +- hstream-kafka/hstream-kafka.cabal | 1 + 4 files changed, 418 insertions(+), 184 deletions(-) create mode 100644 hstream-kafka/HStream/Kafka/Common/FetchManager.hs diff --git a/hstream-kafka/HStream/Kafka/Common/FetchManager.hs b/hstream-kafka/HStream/Kafka/Common/FetchManager.hs new file mode 100644 index 000000000..36c56cd4f --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Common/FetchManager.hs @@ -0,0 +1,75 @@ +module HStream.Kafka.Common.FetchManager + ( FetchContext (reader) + , FetchLogContext (..) + , getFetchLogCtx + , setFetchLogCtx + , clearFetchLogCtx + , getAllFetchLogs + + , fakeFetchContext + , initFetchContext + ) where + +import qualified Data.HashMap.Strict as HM +import Data.Int +import Data.IORef +import Data.Vector (Vector) +import qualified Data.Vector as V +import Foreign.ForeignPtr (newForeignPtr_) +import Foreign.Ptr (nullPtr) + +import qualified HStream.Kafka.Common.RecordFormat as K +import qualified HStream.Store as S + +data FetchLogContext = FetchLogContext + { nextOffset :: Int64 + -- ^ Expect next offset to be fetched + , remRecords :: Vector K.RecordFormat + -- ^ Remaining records of the batch + } deriving (Show) + +-- Thread-unsafe +data FetchContext = FetchContext + { reader :: !S.LDReader + , logCtxMap :: !(IORef (HM.HashMap S.C_LogID FetchLogContext)) + -- ^ FetchLogContext for each partition/log + } + +getAllFetchLogs :: FetchContext -> IO [S.C_LogID] +getAllFetchLogs ctx = HM.keys <$> readIORef ctx.logCtxMap + +getFetchLogCtx :: FetchContext -> S.C_LogID -> IO (Maybe FetchLogContext) +getFetchLogCtx ctx logid = HM.lookup logid <$> readIORef ctx.logCtxMap + +setFetchLogCtx :: FetchContext -> S.C_LogID -> FetchLogContext -> IO () +setFetchLogCtx ctx logid logctx = + modifyIORef' ctx.logCtxMap $ HM.insert logid logctx + +clearFetchLogCtx :: FetchContext -> IO () +clearFetchLogCtx ctx = modifyIORef' ctx.logCtxMap $ HM.empty + +-- Must be initialized later +fakeFetchContext :: IO FetchContext +fakeFetchContext = do + -- Trick to avoid use maybe, must be initialized later + reader <- newForeignPtr_ nullPtr + FetchContext reader <$> newIORef HM.empty + +initFetchContext :: S.LDClient -> IO FetchContext +initFetchContext ldclient = do + -- Reader used for fetch. + -- + -- Currently, we only need one reader per connection because there will be + -- only one thread to fetch data. + -- + -- TODO: also considering the following: + -- + -- - use a pool of readers. + -- - create a reader(or pool of readers) for each consumer group. + -- + -- NOTE: the maxLogs is set to 1000, which means the reader will fetch at most + -- 1000 logs. + -- TODO: maybe we should set maxLogs dynamically according to the max number + -- of all fetch requests in this connection. + !reader <- S.newLDReader ldclient 1000{-maxLogs-} (Just 10){-bufferSize-} + FetchContext reader <$> newIORef HM.empty diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 2d1670b31..de09fdf35 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -9,10 +9,11 @@ import Control.Monad import Data.ByteString (ByteString) import qualified Data.ByteString as BS import qualified Data.ByteString.Builder as BB -import Data.Either (isRight) import Data.Int import Data.Maybe +import Data.Text (Text) import qualified Data.Text as T +import Data.Vector (Vector) import qualified Data.Vector as V import qualified Data.Vector.Hashtables as HT import qualified Data.Vector.Storable as VS @@ -20,6 +21,7 @@ import GHC.Data.FastMutInt import GHC.Stack (HasCallStack) import qualified HStream.Base.Growing as GV +import qualified HStream.Kafka.Common.FetchManager as K import qualified HStream.Kafka.Common.Metrics as M import qualified HStream.Kafka.Common.OffsetManager as K import qualified HStream.Kafka.Common.RecordFormat as K @@ -36,82 +38,98 @@ import qualified Kafka.Protocol.Service as K ------------------------------------------------------------------------------- +-- {logid: ([RemRecord], [ReadRecord])} type RecordTable = HT.Dictionary (HT.PrimState IO) VS.MVector S.C_LogID V.MVector - (GV.Growing V.Vector GV.RealWorld K.RecordFormat) + (Vector K.RecordFormat, GV.Growing Vector GV.RealWorld K.RecordFormat) --- Tuple of (startLsn, tailLsn, highwaterOffset) -type PartitionOffsetData = Either K.PartitionData (S.LSN, S.LSN, Int64) +data LsnData + = LsnData S.LSN S.LSN Int64 + -- ^ (startLsn, tailLsn, highwaterOffset) + -- + -- NOTE: tailLsn is LSN_INVALID if the partition is empty + | ContReading (Vector K.RecordFormat) Int64 + -- ^ Continue reading, do not need to start reading + -- + -- (remRecords, highwaterOffset) + | ErrPartitionData K.PartitionData + -- ^ Error partition response + deriving (Show) + +extractHiOffset :: LsnData -> Either K.PartitionData Int64 +extractHiOffset (LsnData _ _ o) = Right o +extractHiOffset (ContReading _ o) = Right o +extractHiOffset (ErrPartitionData d) = Left d + +isErrPartitionData :: LsnData -> Bool +isErrPartitionData (ErrPartitionData _) = True +isErrPartitionData _ = False data Partition = Partition { logid :: {-# UNPACK #-} !S.C_LogID - , elsn :: {-# UNPACK #-} !PartitionOffsetData + , elsn :: !LsnData , request :: !K.FetchPartition - } + } deriving (Show) + +data ReFetchRequest = ReFetchRequest + { topics :: !(Vector (Text, Vector Partition)) + -- Original request + , minBytes :: !Int32 + , maxWaitMs :: !Int32 + , maxBytes :: !Int32 + -- Helpful attrs + , contFetch :: !Bool + , totalReads :: !Int + , allError :: !Bool + } deriving (Show) -- NOTE: this behaviour is not the same as kafka broker +-- +-- TODO +-- +-- 1. What if r.maxBytes is <=0 ? handleFetch :: HasCallStack => ServerContext -> K.RequestContext -> K.FetchRequest -> IO K.FetchResponse -handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do +handleFetch sc@ServerContext{..} _ r_ = K.catchFetchResponseEx $ do + -- Currently, we use a per-connection reader(fetchReader) to read. + let fetchReader = fetchCtx.reader + --------------------------------------- -- * Preprocess request --------------------------------------- - mutNumOfReads <- newFastMutInt 0 -- Total number of real reads - -- kafka broker just throw java.lang.RuntimeException if topics is null, here - -- we do the same. - let K.NonNullKaArray topicReqs = r.topics - topics <- V.forM topicReqs $ \t{- K.FetchTopic -} -> do - -- Partition should be non-empty - let K.NonNullKaArray partitionReqs = t.partitions - orderedParts <- S.listStreamPartitionsOrdered scLDClient - (S.transToTopicStreamName t.topic) - ps <- V.forM partitionReqs $ \p{- K.FetchPartition -} -> do - M.withLabel M.totalConsumeRequest (t.topic, T.pack . show $ p.partition) $ - \counter -> void $ M.addCounter counter 1 - let m_logid = orderedParts V.!? fromIntegral p.partition - case m_logid of - Nothing -> do - let elsn = errorPartitionResponse p.partition K.UNKNOWN_TOPIC_OR_PARTITION - -- Actually, the logid should be Nothing but 0, however, we won't - -- use it, so just set it to 0 - pure $ Partition 0 (Left elsn) p - Just (_, logid) -> do - elsn <- getPartitionLsn scLDClient scOffsetManager logid p.partition - p.fetchOffset - when (isRight elsn) $ void $ atomicFetchAddFastMut mutNumOfReads 1 - pure $ Partition logid elsn p - pure (t.topic, ps) + r <- preProcessRequest sc r_ - numOfReads <- readFastMutInt mutNumOfReads - when (numOfReads < 1) $ do - respTopics <- V.forM topics $ \(topic, partitions) -> do + -- Fail fast: all error + when r.allError $ do + respTopics <- V.forM r.topics $ \(topic, partitions) -> do respPartitionDatas <- V.forM partitions $ \partition -> do case partition.elsn of - Left pd -> pure pd - Right _ -> error "LogicError: this should not be right" + ErrPartitionData pd -> pure pd + x -> error $ "LogicError: this should not be " <> show x pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas) let resp = K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -} + -- Exit early throwIO $ K.FetchResponseEx resp - --------------------------------------- - -- * Start reading - -- - -- Currently, we use a per-connection reader(fetchReader) to read. - --------------------------------------- - V.forM_ topics $ \(_, partitions) -> do - V.forM_ partitions $ \partition -> do - case partition.elsn of - Left _ -> pure () - Right (startlsn, _, _) -> do - Log.debug1 $ "start reading log " - <> Log.build partition.logid - <> " from " <> Log.build startlsn - S.readerStartReading fetchReader partition.logid startlsn S.LSN_MAX + -- Client request to new reading + when (not r.contFetch) $ do + -- Clear the context + K.clearFetchLogCtx fetchCtx + -- Start reading + V.forM_ r.topics $ \(_, partitions) -> do + V.forM_ partitions $ \partition -> do + case partition.elsn of + LsnData startlsn _ _ -> do + Log.debug1 $ "start reading log " + <> Log.build partition.logid + <> " from " <> Log.build startlsn + S.readerStartReading fetchReader partition.logid startlsn S.LSN_MAX + _ -> pure () --------------------------------------- -- * Read records from storage @@ -122,34 +140,23 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do -- -- FIXME: Do not setWaitOnlyWhenNoData if you are mostly focusing on -- throughput - -- Mode1 - records <- readMode1 fetchReader - --------------------------------------- - -- * Process read records - -- - -- TODO: what if client send two same topic but with different partitions? - --------------------------------------- - -- {logid: [RecordFormat]} - readRecords <- HT.initialize numOfReads :: IO RecordTable - forM_ records $ \record -> do - recordFormat <- K.runGet @K.RecordFormat record.recordPayload - let logid = record.recordAttr.recordAttrLogID - v <- maybe GV.new pure =<< (HT.lookup readRecords logid) - v' <- GV.append v recordFormat - HT.insert readRecords logid v' + -- FIXME: what if client send two same topic but with different partitions? + -- {logid: ([RemRecord], [ReadRecord])} + readRecords <- readMode1 r serverOpts._storage fetchReader --------------------------------------- -- * Generate response --------------------------------------- mutMaxBytes <- newFastMutInt $ fromIntegral r.maxBytes mutIsFirstPartition <- newFastMutInt 1 -- TODO: improve this - respTopics <- V.forM topics $ \(topic, partitions) -> do + respTopics <- V.forM r.topics $ \(topic, partitions) -> do respPartitionDatas <- V.forM partitions $ \partition -> do let request = partition.request - case partition.elsn of + let e_hioffset = extractHiOffset partition.elsn + case e_hioffset of Left pd -> pure pd - Right (_startlsn, _endlsn, hioffset) -> do + Right hioffset -> do mgv <- HT.lookup readRecords partition.logid case mgv of Nothing -> @@ -164,9 +171,18 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do -- logStartOffset now , logStartOffset = (-1) } - Just gv -> do - v <- GV.unsafeFreeze gv - bs <- encodePartition mutMaxBytes mutIsFirstPartition request v + Just (remv, gv) -> do + v <- if V.null remv + then GV.unsafeFreeze gv + -- TODO PERF + else (remv <>) <$> GV.unsafeFreeze gv + (bs, m_offset, tokenIdx) <- encodePartition mutMaxBytes mutIsFirstPartition request v + K.setFetchLogCtx + fetchCtx + partition.logid + K.FetchLogContext{ nextOffset = fromMaybe (-1) m_offset + , remRecords = V.drop (tokenIdx + 1) v + } -- Stats let partLabel = (topic, T.pack . show $ request.partition) M.withLabel M.topicTotalSendBytes partLabel $ \counter -> void $ @@ -189,61 +205,119 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas) pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -} - where - -- Currently unused - readMode0 :: S.LDReader -> IO [S.DataRecord ByteString] - readMode0 reader = do - if r.minBytes <= 0 || r.maxWaitMs <= 0 - then S.readerSetTimeout reader 0 -- nonblocking - else S.readerSetTimeout reader r.maxWaitMs - S.readerSetWaitOnlyWhenNoData reader - (_, records) <- foldWhileM (0, []) $ \(size, acc) -> do - rs <- M.observeDuration M.topicReadStoreLatency $ S.readerRead reader 100 - if null rs - then pure ((size, acc), False) - else do let size' = size + sum (map (K.recordBytesSize . (.recordPayload)) rs) - acc' = acc <> rs - if size' >= fromIntegral r.minBytes - then pure ((size', acc'), False) - else pure ((size', acc'), True) - pure records - - readMode1 :: S.LDReader -> IO [S.DataRecord ByteString] - readMode1 reader = do - let storageOpts = serverOpts._storage - defTimeout = fromIntegral storageOpts.fetchReaderTimeout - - if r.minBytes <= 0 || r.maxWaitMs <= 0 -- respond immediately - then do S.readerSetTimeout reader 0 -- nonblocking - S.readerRead reader storageOpts.fetchMaxLen - else - if r.maxWaitMs > defTimeout - then do - S.readerSetTimeout reader defTimeout - rs1 <- M.observeDuration M.topicReadStoreLatency $ - S.readerRead reader storageOpts.fetchMaxLen - let size = sum (map (K.recordBytesSize . (.recordPayload)) rs1) - if size >= fromIntegral r.minBytes - then pure rs1 - else do S.readerSetTimeout reader (r.maxWaitMs - defTimeout) - rs2 <- M.observeDuration M.topicReadStoreLatency $ - S.readerRead reader storageOpts.fetchMaxLen - pure $ rs1 <> rs2 - else do - S.readerSetTimeout reader r.maxWaitMs - M.observeDuration M.topicReadStoreLatency $ S.readerRead reader storageOpts.fetchMaxLen - ------------------------------------------------------------------------------- --- Return tuple of (startLsn, tailLsn, highwaterOffset) --- --- NOTE: tailLsn is LSN_INVALID if the partition is empty +preProcessRequest :: ServerContext -> K.FetchRequest -> IO ReFetchRequest +preProcessRequest ServerContext{..} r = do + -- kafka broker just throw java.lang.RuntimeException if topics is null, here + -- we do the same. + let K.NonNullKaArray topicReqs = r.topics + mutContFetch <- newFastMutInt 1 -- Bool + mutNumOfReads <- newFastMutInt 0 -- Total number of reads + topics <- V.forM topicReqs $ \t{- K.FetchTopic -} -> do + -- Partition should be non-empty + let K.NonNullKaArray partitionReqs = t.partitions + -- FIXME: we can also cache this in FetchContext, however, we need to + -- consider the following: what if someone delete the topic? + orderedParts <- S.listStreamPartitionsOrdered scLDClient + (S.transToTopicStreamName t.topic) + ps <- V.forM partitionReqs $ \p{- K.FetchPartition -} -> do + M.withLabel M.totalConsumeRequest (t.topic, T.pack . show $ p.partition) $ + \counter -> void $ M.addCounter counter 1 + let m_logid = orderedParts V.!? fromIntegral p.partition + case m_logid of + Nothing -> do + let elsn = ErrPartitionData $ + errorPartitionResponse p.partition K.UNKNOWN_TOPIC_OR_PARTITION + -- Actually, the logid should be Nothing but 0, however, we won't + -- use it, so just set it to 0 + pure $ Partition 0 elsn p + Just (_, logid) -> do + void $ atomicFetchAddFastMut mutNumOfReads 1 + contFetch <- readFastMutInt mutContFetch + elsn <- + if contFetch == 0 + then getPartitionLsn scLDClient scOffsetManager logid p.partition + p.fetchOffset + else do + m_logCtx <- K.getFetchLogCtx fetchCtx logid + case m_logCtx of + Nothing -> do -- Cache miss + writeFastMutInt mutContFetch 0 + getPartitionLsn scLDClient scOffsetManager + logid p.partition p.fetchOffset + Just logCtx -> + if (logCtx.nextOffset /= p.fetchOffset) + then do + writeFastMutInt mutContFetch 0 + getPartitionLsn scLDClient scOffsetManager logid p.partition + p.fetchOffset + else do + m <- K.getLatestOffsetWithLsn scOffsetManager logid + case m of + Just (latestOffset, _tailLsn) -> do + let highwaterOffset = latestOffset + 1 + pure $ ContReading logCtx.remRecords highwaterOffset + Nothing -> do + Log.debug $ "Continue reading, but logid " + <> Log.build logid <> " is empty" + pure $ ErrPartitionData $ + errorPartitionResponse p.partition + K.OFFSET_OUT_OF_RANGE + pure $ Partition logid elsn p + pure (t.topic, ps) + contFetch <- readFastMutInt mutContFetch + numOfReads <- readFastMutInt mutNumOfReads + -- TODO PERF: We can bybass loop all topics(using a global mutAllError). + -- However, this will make the code more complex. + let doesAllError ts = all (all (isErrPartitionData . (.elsn)) . snd) ts + if contFetch == 0 + then do + pure $ ReFetchRequest{ topics = topics + , minBytes = r.minBytes + , maxBytes = r.maxBytes + , maxWaitMs = r.maxWaitMs + , contFetch = False + , totalReads = numOfReads + , allError = doesAllError topics + } + else do cacheNumOfReads <- length <$> K.getAllFetchLogs fetchCtx + if numOfReads == cacheNumOfReads + then + pure $ ReFetchRequest{ topics = topics + , minBytes = r.minBytes + , maxBytes = r.maxBytes + , maxWaitMs = r.maxWaitMs + , contFetch = True + , totalReads = numOfReads + , allError = doesAllError topics + } + else do + ts <- forM topics $ \(tn, ps) -> do + ps' <- forM ps $ \p -> do + case p.elsn of + ContReading _ _ -> do + elsn <- getPartitionLsn scLDClient scOffsetManager p.logid + p.request.partition + p.request.fetchOffset + pure $ p{elsn = elsn} + _ -> pure p + pure (tn, ps') + pure $ ReFetchRequest{ topics = ts + , minBytes = r.minBytes + , maxBytes = r.maxBytes + , maxWaitMs = r.maxWaitMs + , contFetch = False + , totalReads = numOfReads + , allError = doesAllError ts + } + getPartitionLsn :: S.LDClient -> K.OffsetManager -> S.C_LogID -> Int32 -> Int64 -- ^ kafka start offset - -> IO PartitionOffsetData + -> IO LsnData getPartitionLsn ldclient om logid partition offset = do m <- K.getLatestOffsetWithLsn om logid case m of @@ -256,23 +330,90 @@ getPartitionLsn ldclient om logid partition offset = do (_, startLsn) <- S.findKey ldclient logid key S.FindKeyStrict Log.debug1 $ "FindKey result " <> Log.build logid <> ": " <> Log.build startLsn - pure $ Right (startLsn, tailLsn, highwaterOffset) + pure $ LsnData startLsn tailLsn highwaterOffset | offset == latestOffset -> - pure $ Right (tailLsn, tailLsn, highwaterOffset) + pure $ LsnData tailLsn tailLsn highwaterOffset | offset == highwaterOffset -> - pure $ Right (tailLsn + 1, tailLsn, highwaterOffset) + pure $ LsnData (tailLsn + 1) tailLsn highwaterOffset | offset > highwaterOffset -> - pure $ Left $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE + pure $ ErrPartitionData $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE -- ghc is not smart enough to detact my partten matching is complete | otherwise -> error "This should not be reached (getPartitionLsn)" Nothing -> do Log.debug $ "Partition " <> Log.build logid <> " is empty" if offset == 0 - then pure $ Right (S.LSN_MIN, S.LSN_INVALID, 0) - else pure $ Left $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE + then pure $ LsnData S.LSN_MIN S.LSN_INVALID 0 + else pure $ ErrPartitionData $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE + +readMode1 + :: ReFetchRequest + -> StorageOptions + -> S.LDReader + -> IO RecordTable +readMode1 r storageOpts reader = do + recordTable <- HT.initialize r.totalReads :: IO RecordTable + mutRemSize <- newFastMutInt 0 + when r.contFetch $ do + forM_ r.topics $ \(_, partitions) -> + forM_ partitions $ \p -> do + case p.elsn of + ContReading remRecords _ -> do + void $ atomicFetchAddFastMut mutRemSize $ V.sum $ + V.map (BS.length . K.unCompactBytes . (.recordBytes)) remRecords + -- [TAG_NEV]: Make sure do not insert empty vector to the table, + -- since we will assume the vector is non-empty in `encodePartition` + when (not $ V.null remRecords) $ + insertRemRecords recordTable p.logid remRecords + x -> Log.fatal $ + "LogicError: this should not be reached, " <> Log.buildString' x + remsize <- readFastMutInt mutRemSize + if remsize > fromIntegral r.maxBytes -- assume r.maxBytes > 0 + then pure recordTable + else doRead recordTable + where + doRead recordTable = do + let defTimeout = fromIntegral storageOpts.fetchReaderTimeout + + if r.minBytes <= 0 || r.maxWaitMs <= 0 -- respond immediately + then do S.readerSetTimeout reader 0 -- nonblocking + insertRecords recordTable + =<< S.readerRead reader storageOpts.fetchMaxLen + else + if r.maxWaitMs > defTimeout + then do + S.readerSetTimeout reader defTimeout + rs1 <- M.observeDuration M.topicReadStoreLatency $ + S.readerRead reader storageOpts.fetchMaxLen + insertRecords recordTable rs1 + -- FIXME: this size is not accurate because of the CompactBytes + -- See: K.recordBytesSize + let size = sum (map (K.recordBytesSize . (.recordPayload)) rs1) + when (size < fromIntegral r.minBytes) $ do + S.readerSetTimeout reader (r.maxWaitMs - defTimeout) + rs2 <- M.observeDuration M.topicReadStoreLatency $ + S.readerRead reader storageOpts.fetchMaxLen + insertRecords recordTable rs2 + else do + S.readerSetTimeout reader r.maxWaitMs + rs <- M.observeDuration M.topicReadStoreLatency $ + S.readerRead reader storageOpts.fetchMaxLen + insertRecords recordTable rs + pure recordTable + + insertRemRecords :: RecordTable -> S.C_LogID -> Vector K.RecordFormat -> IO () + insertRemRecords table logid records = do + (rv, v) <- maybe ((V.empty, ) <$> GV.new) pure =<< (HT.lookup table logid) + HT.insert table logid (rv <> records, v) + + insertRecords :: RecordTable -> [S.DataRecord ByteString] -> IO () + insertRecords table records = + forM_ records $ \record -> do + recordFormat <- K.runGet @K.RecordFormat record.recordPayload + let logid = record.recordAttr.recordAttrLogID + (rv, v) <- maybe ((V.empty, ) <$> GV.new) pure =<< (HT.lookup table logid) + v' <- GV.append v recordFormat + HT.insert table logid (rv, v') --- Note this function's behaviour is not the same as kafka broker --- -- In kafka broker, regarding the format on disk, the broker will return -- the message format according to the fetch api version. Which means -- @@ -284,21 +425,25 @@ getPartitionLsn ldclient om logid partition offset = do -- Here, we donot handle the fetch api version, we just return the message -- format according to the message format on disk. -- --- However, if you always use RecordBath for appending and reading, it +-- However, if you always use RecordBatch for appending and reading, it -- won't be a problem. encodePartition :: FastMutInt -> FastMutInt -> K.FetchPartition - -> V.Vector K.RecordFormat - -> IO ByteString + -> Vector K.RecordFormat + -> IO (ByteString, Maybe Int64, Int) + -- ^ (encoded bytes, next offset, taken vector index) + -- + -- taken vector index: -1 means no vector taken, otherwise, the index of + -- the vector taken encodePartition mutMaxBytes mutIsFirstPartition p v = do maxBytes <- readFastMutInt mutMaxBytes - if maxBytes > 0 then doEncode maxBytes else pure "" + if maxBytes > 0 then doEncode maxBytes else pure ("", Nothing, (-1)) where doEncode maxBytes = do isFristPartition <- readFastMutInt mutIsFirstPartition - (fstRecordBytes, vs) <- reFormat + (fstRecordBytes, vs) <- trySeek let fstLen = BS.length fstRecordBytes if isFristPartition == 1 -- First partition @@ -306,51 +451,65 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do writeFastMutInt mutIsFirstPartition 0 -- next partition should not be the first if fstLen >= maxBytes then do writeFastMutInt mutMaxBytes (-1) - pure fstRecordBytes + mo <- K.decodeNextRecordOffset fstRecordBytes + pure (fstRecordBytes, mo, 0) else if fstLen >= (fromIntegral p.partitionMaxBytes) then do void $ atomicFetchAddFastMut mutMaxBytes (-fstLen) - pure fstRecordBytes + mo <- K.decodeNextRecordOffset fstRecordBytes + pure (fstRecordBytes, mo, 0) else doEncodeElse fstRecordBytes vs -- Not the first partition else do if fstLen <= maxBytes then doEncodeElse fstRecordBytes vs - else pure "" + else pure ("", Nothing, (-1)) - doEncodeElse fstRecordBytes vs = do - let fstLen = BS.length fstRecordBytes + doEncodeElse fstBs vs = do + let fstLen = BS.length fstBs void $ atomicFetchAddFastMut mutMaxBytes (-fstLen) mutPartitionMaxBytes <- newFastMutInt (fromIntegral p.partitionMaxBytes - fstLen) - bb <- V.foldM (\b r -> do - let rbs = K.unCompactBytes r.recordBytes - rlen = BS.length rbs - curMaxBytes <- atomicFetchAddFastMut mutMaxBytes (-rlen) - curPartMaxBytes <- atomicFetchAddFastMut mutPartitionMaxBytes (-rlen) - -- take a negative number of bytes will return an empty ByteString - let rbs' = BS.take (min curPartMaxBytes curMaxBytes) rbs - if BS.null rbs' - then pure b - else pure $ b <> BB.byteString rbs' - ) (BB.byteString fstRecordBytes) vs - - pure $ BS.toStrict $ BB.toLazyByteString bb - - reFormat = do - let (rf :: K.RecordFormat, vs) = - -- This should not be Nothing, because if we found the key - -- in `readRecords`, it means we have at least one record - -- in this + (bb, lastOffset', takenVecIdx) <- + vecFoldWhileM vs (BB.byteString fstBs, Left fstBs, 0) $ \(b, lb, i) r -> do + -- FIXME: Does this possible be multiple BatchRecords? + let rbs = K.unCompactBytes r.recordBytes + rlen = BS.length rbs + curMaxBytes <- atomicFetchAddFastMut mutMaxBytes (-rlen) + curPartMaxBytes <- atomicFetchAddFastMut mutPartitionMaxBytes (-rlen) + let capLen = min curPartMaxBytes curMaxBytes + -- take a negative number of bytes will return an empty ByteString + rbs' = BS.take capLen rbs + b' = b <> BB.byteString rbs' + if capLen < rlen + then do + mo1 <- K.decodeNextRecordOffset rbs' + case mo1 of + Just _ -> pure ((b', Right mo1, i), False) + Nothing -> do + mo2 <- K.decodeNextRecordOffset (fromLeft' lb) + pure ((b', Right mo2, i), False) + else pure ((b', Left rbs, i + 1), True) + lastOffset <- either K.decodeNextRecordOffset pure lastOffset' + + pure (BS.toStrict $ BB.toLazyByteString bb, lastOffset, takenVecIdx) + + -- Try to bypass the records if the fetch offset is not the first record + -- in the batch. + trySeek = do + let (fstRecord :: K.RecordFormat, vs) = + -- [TAG_NEV]: This should not be Nothing, because if we found the + -- key in `readRecords`, it means we have at least one record in + -- this. fromMaybe (error "LogicError: got empty vector value") - (V.uncons v) - bytesOnDisk = K.unCompactBytes rf.recordBytes + (V.uncons v) + bytesOnDisk = K.unCompactBytes fstRecord.recordBytes -- only the first MessageSet need to to this seeking magic <- K.decodeRecordMagic bytesOnDisk fstRecordBytes <- if | magic >= 2 -> pure bytesOnDisk | otherwise -> do - let absStartOffset = rf.offset + 1 - fromIntegral rf.batchLength + let absStartOffset = fstRecord.offset + 1 - fromIntegral fstRecord.batchLength offset = p.fetchOffset - absStartOffset if offset > 0 then do @@ -372,7 +531,17 @@ errorPartitionResponse partitionIndex ec = K.PartitionData } {-# INLINE errorPartitionResponse #-} -foldWhileM :: Monad m => a -> (a -> m (a, Bool)) -> m a -foldWhileM !a f = do - (a', b) <- f a - if b then foldWhileM a' f else pure a' +------------------------------------------------------------------------------- + +-- NOTE: condition is True -> continue; False -> break +vecFoldWhileM :: Monad m => Vector b -> a -> (a -> b -> m (a, Bool)) -> m a +vecFoldWhileM !bs !a !f = + case V.uncons bs of + Nothing -> pure a + Just (b, bs') -> do + (a', cont) <- f a b + if cont then vecFoldWhileM bs' a' f else pure a' + +fromLeft' :: Either a b -> a +fromLeft' (Left x) = x +fromLeft' _ = error "This should not be reached (fromLeft')" diff --git a/hstream-kafka/HStream/Kafka/Server/Types.hs b/hstream-kafka/HStream/Kafka/Server/Types.hs index 48dd28bd0..6ee570a7e 100644 --- a/hstream-kafka/HStream/Kafka/Server/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Types.hs @@ -12,6 +12,9 @@ import Foreign.Ptr (nullPtr) import HStream.Common.Server.HashRing (LoadBalanceHashRing, initializeHashRing) import HStream.Gossip.Types (GossipContext) +import HStream.Kafka.Common.FetchManager (FetchContext, + fakeFetchContext, + initFetchContext) import HStream.Kafka.Common.OffsetManager (OffsetManager, initOffsetReader, newOffsetManager) @@ -39,7 +42,7 @@ data ServerContext = ServerContext , kafkaBrokerConfigs :: !KC.KafkaBrokerConfigs -- { per connection, see 'initConnectionContext' , scOffsetManager :: !OffsetManager - , fetchReader :: !S.LDReader + , fetchCtx :: !FetchContext -- } per connection end } @@ -59,9 +62,10 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do epochHashRing <- initializeHashRing gossipContext scGroupCoordinator <- mkGroupCoordinator mh ldclient _serverID + -- must be initialized later offsetManager <- newOffsetManager ldclient -- Trick to avoid use maybe, must be initialized later - fetchReader <- newForeignPtr_ nullPtr + fetchCtx <- fakeFetchContext return ServerContext @@ -78,7 +82,7 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do , scGroupCoordinator = scGroupCoordinator , kafkaBrokerConfigs = _kafkaBrokerConfigs , scOffsetManager = offsetManager - , fetchReader = fetchReader + , fetchCtx = fetchCtx } initConnectionContext :: ServerContext -> IO ServerContext @@ -86,21 +90,6 @@ initConnectionContext sc = do -- Since the Reader inside OffsetManger is thread-unsafe, for each connection -- we create a new Reader. !om <- initOffsetReader $ scOffsetManager sc + !fc <- initFetchContext (scLDClient sc) - -- Reader used for fetch. - -- - -- Currently, we only need one reader per connection because there will be - -- only one thread to fetch data. - -- - -- TODO: also considering the following: - -- - -- - use a pool of readers. - -- - create a reader(or pool of readers) for each consumer group. - -- - -- NOTE: the maxLogs is set to 1000, which means the reader will fetch at most - -- 1000 logs. - -- TODO: maybe we should set maxLogs dynamically according to the max number - -- of all fetch requests in this connection. - !reader <- S.newLDReader (scLDClient sc) 1000{-maxLogs-} (Just 10){-bufferSize-} - - pure sc{scOffsetManager = om, fetchReader = reader} + pure sc{scOffsetManager = om, fetchCtx = fc} diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index dfb068a02..9728a5064 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -123,6 +123,7 @@ library exposed-modules: HStream.Kafka.Client.Api HStream.Kafka.Client.Cli + HStream.Kafka.Common.FetchManager HStream.Kafka.Common.KafkaException HStream.Kafka.Common.Metrics HStream.Kafka.Common.OffsetManager