diff --git a/common/base/HStream/Logger.hs b/common/base/HStream/Logger.hs index 2bc65e946..225922d59 100644 --- a/common/base/HStream/Logger.hs +++ b/common/base/HStream/Logger.hs @@ -23,6 +23,7 @@ module HStream.Logger -- * Log function , trace , debug + , debug1 , info , warning , fatal @@ -120,6 +121,9 @@ pattern WARNING = Level 30 pattern INFO :: Level pattern INFO = Level 20 +pattern DEBUG1 :: Level +pattern DEBUG1 = Level 11 + pattern DEBUG :: Level pattern DEBUG = Level 10 @@ -134,6 +138,7 @@ instance Show Level where show FATAL = "fatal" show WARNING = "warning" show INFO = "info" + show DEBUG1 = "debug1" show DEBUG = "debug" show TRACE = "trace" show NOTSET = "notset" @@ -148,6 +153,7 @@ instance Read Level where Read.Ident "fatal" -> FATAL Read.Ident "warning" -> WARNING Read.Ident "info" -> INFO + Read.Ident "debug1" -> DEBUG1 Read.Ident "debug" -> DEBUG Read.Ident "trace" -> TRACE x -> errorWithoutStackTrace $ "cannot parse log level" <> show x @@ -200,6 +206,7 @@ defaultColoredFmt time level content cstack (ThreadId tid#) = case level of -- TODO: color for trace DEBUG -> color Cyan b + DEBUG1 -> color Cyan b INFO -> color Magenta b WARNING -> color Yellow b FATAL -> color Red b @@ -222,6 +229,7 @@ defaultFmtLevel level = case level of WARNING -> "WARNING" INFO -> "INFO" DEBUG -> "DEBUG" + DEBUG1 -> "DEBUG1" TRACE -> "TRACE" NOTSET -> "NOTSET" level' -> "LEVEL" <> Log.toLogStr (unLevel level') @@ -369,6 +377,9 @@ trace = logBylevel False TRACE callStack debug :: HasCallStack => Log.LogStr -> IO () debug = logBylevel False DEBUG callStack +debug1 :: HasCallStack => Log.LogStr -> IO () +debug1 = logBylevel False DEBUG1 callStack + info :: HasCallStack => Log.LogStr -> IO () info = logBylevel False INFO callStack diff --git a/common/hstream/HStream/Utils/Converter.hs b/common/hstream/HStream/Utils/Converter.hs index b842a84a1..51c158a96 100644 --- a/common/hstream/HStream/Utils/Converter.hs +++ b/common/hstream/HStream/Utils/Converter.hs @@ -14,6 +14,7 @@ module HStream.Utils.Converter , bs2str , lbs2text , int2cbytes + , intToCBytesWithPadding , cBytesToText , cbytes2bs , cBytesToLazyText @@ -48,6 +49,7 @@ import qualified Data.Aeson as Aeson import Data.Bifunctor (bimap) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BL +import Data.Int import qualified Data.Map as M import qualified Data.Map.Strict as Map import Data.Scientific (toRealFloat) @@ -144,6 +146,12 @@ int2cbytes :: (Integral a, Bounded a) => a -> ZCB.CBytes int2cbytes = ZCB.buildCBytes . Build.int {-# INLINE int2cbytes #-} +intToCBytesWithPadding :: Int64 -> ZCB.CBytes +intToCBytesWithPadding = + -- The max length of Word64 is 20: length $ show (maxBound :: Word64) + ZCB.buildCBytes . Build.intWith (Build.IFormat 20 Build.ZeroPadding False) +{-# INLINE intToCBytesWithPadding #-} + cBytesToText :: ZCB.CBytes -> Text cBytesToText = Text.pack . ZCB.unpack diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index df064f666..1e25326a6 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -74,6 +74,8 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do case elsn of Left _ -> pure () Right (startlsn, _, _) -> do + Log.debug1 $ "start reading log " + <> Log.build logid <> " from " <> Log.build startlsn S.readerStartReading reader logid startlsn S.LSN_MAX -- Read records from storage @@ -148,8 +150,12 @@ getPartitionLsn ldclient om logid partition offset = do Just (latestOffset, tailLsn) -> do let highwaterOffset = latestOffset + 1 if | offset < latestOffset -> do - let key = U.int2cbytes offset + let key = U.intToCBytesWithPadding offset + Log.debug1 $ "Try findKey " <> Log.buildString' key <> " in logid " + <> Log.build logid (_, startLsn) <- S.findKey ldclient logid key S.FindKeyStrict + Log.debug1 $ "FindKey result " <> Log.build logid <> ": " + <> Log.build startLsn pure $ Right (startLsn, tailLsn, highwaterOffset) | offset == latestOffset -> pure $ Right (tailLsn, tailLsn, highwaterOffset) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index d5d24a553..f72fe3743 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -12,6 +12,7 @@ import Data.Word import qualified HStream.Kafka.Common.OffsetManager as K import qualified HStream.Kafka.Common.RecordFormat as K import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Logger as Log import qualified HStream.Store as S import qualified HStream.Utils as U import qualified Kafka.Protocol.Encoding as K @@ -51,11 +52,15 @@ handleProduceV2 ServerContext{..} _ K.ProduceRequestV2{..} = do partitionResponses <- V.forM partitionData' $ \K.PartitionProduceDataV2{..} -> do let Just (_, logid) = partitions V.!? (fromIntegral index) -- TODO: handle Nothing let Just recordBytes' = recordBytes -- TODO: handle Nothing - + Log.debug1 $ "Append to logid " <> Log.build logid + <> "(" <> Log.build index <> ")" -- Wirte appends (S.AppendCompletion{..}, offset) <- appendRecords True scLDClient scOffsetManager logid recordBytes' + Log.debug1 $ "Append done " <> Log.build appendCompLogID + <> ", lsn: " <> Log.build appendCompLSN + -- TODO: logAppendTimeMs, only support LogAppendTime now pure $ K.PartitionProduceResponseV2 index K.NONE offset appendCompTimestamp @@ -99,7 +104,7 @@ appendRecords shouldValidateCrc ldclient om logid bs = do K.withOffsetN om logid (fromIntegral batchLength) $ \o -> do let startOffset = o + 1 - fromIntegral batchLength records' = K.modifyBatchRecordsOffset (+ startOffset) records - let appendKey = U.int2cbytes o + let appendKey = U.intToCBytesWithPadding o appendAttrs = Just [(S.KeyTypeFindKey, appendKey)] storedBs = K.encodeBatchRecords records' -- FIXME unlikely overflow: convert batchLength from Int to Int32 diff --git a/hstream-store/test/HStream/StoreSpec.hs b/hstream-store/test/HStream/StoreSpec.hs index d9ca648da..4e275b73e 100644 --- a/hstream-store/test/HStream/StoreSpec.hs +++ b/hstream-store/test/HStream/StoreSpec.hs @@ -77,22 +77,22 @@ base = describe "Base" $ do void $ S.appendCompressedBS client randlogid "p" S.CompressionNone (Just []) sn0 <- S.appendCompLSN <$> S.appendCompressedBS client randlogid "p0" S.CompressionNone - (Just [(S.KeyTypeFindKey, "0")]) + (Just [(S.KeyTypeFindKey, "00")]) sn1 <- S.appendCompLSN <$> S.appendCompressedBS client randlogid "p1" S.CompressionNone - (Just [(S.KeyTypeFindKey, "1")]) + (Just [(S.KeyTypeFindKey, "01")]) sn2 <- S.appendCompLSN <$> S.appendCompressedBS client randlogid "p2" S.CompressionNone - (Just [(S.KeyTypeFindKey, "2")]) + (Just [(S.KeyTypeFindKey, "02")]) void $ S.appendCompressedBS client randlogid "p" S.CompressionNone Nothing - (lo0, hi0) <- S.findKey client randlogid "0" S.FindKeyStrict + (lo0, hi0) <- S.findKey client randlogid "00" S.FindKeyStrict lo0 `shouldBe` S.LSN_INVALID hi0 `shouldBe` sn0 - (lo1, hi1) <- S.findKey client randlogid "1" S.FindKeyStrict + (lo1, hi1) <- S.findKey client randlogid "01" S.FindKeyStrict lo1 `shouldBe` sn0 hi1 `shouldBe` sn1 - (lo2, hi2) <- S.findKey client randlogid "2" S.FindKeyStrict + (lo2, hi2) <- S.findKey client randlogid "02" S.FindKeyStrict lo2 `shouldBe` sn1 hi2 `shouldBe` sn2