Skip to content

Commit

Permalink
kafka: correct appended key which interaction with findKey (#1686)
Browse files Browse the repository at this point in the history
* add intToCBytesWithPadding

* add debug1 loglevel

* Fix findKey
  • Loading branch information
4eUeP authored Nov 17, 2023
1 parent 811f8d4 commit 869c0a1
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 9 deletions.
11 changes: 11 additions & 0 deletions common/base/HStream/Logger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ module HStream.Logger
-- * Log function
, trace
, debug
, debug1
, info
, warning
, fatal
Expand Down Expand Up @@ -120,6 +121,9 @@ pattern WARNING = Level 30
pattern INFO :: Level
pattern INFO = Level 20

pattern DEBUG1 :: Level
pattern DEBUG1 = Level 11

pattern DEBUG :: Level
pattern DEBUG = Level 10

Expand All @@ -134,6 +138,7 @@ instance Show Level where
show FATAL = "fatal"
show WARNING = "warning"
show INFO = "info"
show DEBUG1 = "debug1"
show DEBUG = "debug"
show TRACE = "trace"
show NOTSET = "notset"
Expand All @@ -148,6 +153,7 @@ instance Read Level where
Read.Ident "fatal" -> FATAL
Read.Ident "warning" -> WARNING
Read.Ident "info" -> INFO
Read.Ident "debug1" -> DEBUG1
Read.Ident "debug" -> DEBUG
Read.Ident "trace" -> TRACE
x -> errorWithoutStackTrace $ "cannot parse log level" <> show x
Expand Down Expand Up @@ -200,6 +206,7 @@ defaultColoredFmt time level content cstack (ThreadId tid#) =
case level of
-- TODO: color for trace
DEBUG -> color Cyan b
DEBUG1 -> color Cyan b
INFO -> color Magenta b
WARNING -> color Yellow b
FATAL -> color Red b
Expand All @@ -222,6 +229,7 @@ defaultFmtLevel level = case level of
WARNING -> "WARNING"
INFO -> "INFO"
DEBUG -> "DEBUG"
DEBUG1 -> "DEBUG1"
TRACE -> "TRACE"
NOTSET -> "NOTSET"
level' -> "LEVEL" <> Log.toLogStr (unLevel level')
Expand Down Expand Up @@ -369,6 +377,9 @@ trace = logBylevel False TRACE callStack
debug :: HasCallStack => Log.LogStr -> IO ()
debug = logBylevel False DEBUG callStack

debug1 :: HasCallStack => Log.LogStr -> IO ()
debug1 = logBylevel False DEBUG1 callStack

info :: HasCallStack => Log.LogStr -> IO ()
info = logBylevel False INFO callStack

Expand Down
8 changes: 8 additions & 0 deletions common/hstream/HStream/Utils/Converter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module HStream.Utils.Converter
, bs2str
, lbs2text
, int2cbytes
, intToCBytesWithPadding
, cBytesToText
, cbytes2bs
, cBytesToLazyText
Expand Down Expand Up @@ -48,6 +49,7 @@ import qualified Data.Aeson as Aeson
import Data.Bifunctor (bimap)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import Data.Int
import qualified Data.Map as M
import qualified Data.Map.Strict as Map
import Data.Scientific (toRealFloat)
Expand Down Expand Up @@ -144,6 +146,12 @@ int2cbytes :: (Integral a, Bounded a) => a -> ZCB.CBytes
int2cbytes = ZCB.buildCBytes . Build.int
{-# INLINE int2cbytes #-}

intToCBytesWithPadding :: Int64 -> ZCB.CBytes
intToCBytesWithPadding =
-- The max length of Word64 is 20: length $ show (maxBound :: Word64)
ZCB.buildCBytes . Build.intWith (Build.IFormat 20 Build.ZeroPadding False)
{-# INLINE intToCBytesWithPadding #-}

cBytesToText :: ZCB.CBytes -> Text
cBytesToText = Text.pack . ZCB.unpack

Expand Down
8 changes: 7 additions & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
case elsn of
Left _ -> pure ()
Right (startlsn, _, _) -> do
Log.debug1 $ "start reading log "
<> Log.build logid <> " from " <> Log.build startlsn
S.readerStartReading reader logid startlsn S.LSN_MAX

-- Read records from storage
Expand Down Expand Up @@ -148,8 +150,12 @@ getPartitionLsn ldclient om logid partition offset = do
Just (latestOffset, tailLsn) -> do
let highwaterOffset = latestOffset + 1
if | offset < latestOffset -> do
let key = U.int2cbytes offset
let key = U.intToCBytesWithPadding offset
Log.debug1 $ "Try findKey " <> Log.buildString' key <> " in logid "
<> Log.build logid
(_, startLsn) <- S.findKey ldclient logid key S.FindKeyStrict
Log.debug1 $ "FindKey result " <> Log.build logid <> ": "
<> Log.build startLsn
pure $ Right (startLsn, tailLsn, highwaterOffset)
| offset == latestOffset ->
pure $ Right (tailLsn, tailLsn, highwaterOffset)
Expand Down
9 changes: 7 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Data.Word
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
Expand Down Expand Up @@ -51,11 +52,15 @@ handleProduceV2 ServerContext{..} _ K.ProduceRequestV2{..} = do
partitionResponses <- V.forM partitionData' $ \K.PartitionProduceDataV2{..} -> do
let Just (_, logid) = partitions V.!? (fromIntegral index) -- TODO: handle Nothing
let Just recordBytes' = recordBytes -- TODO: handle Nothing

Log.debug1 $ "Append to logid " <> Log.build logid
<> "(" <> Log.build index <> ")"
-- Wirte appends
(S.AppendCompletion{..}, offset) <-
appendRecords True scLDClient scOffsetManager logid recordBytes'

Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN

-- TODO: logAppendTimeMs, only support LogAppendTime now
pure $ K.PartitionProduceResponseV2 index K.NONE offset appendCompTimestamp

Expand Down Expand Up @@ -99,7 +104,7 @@ appendRecords shouldValidateCrc ldclient om logid bs = do
K.withOffsetN om logid (fromIntegral batchLength) $ \o -> do
let startOffset = o + 1 - fromIntegral batchLength
records' = K.modifyBatchRecordsOffset (+ startOffset) records
let appendKey = U.int2cbytes o
let appendKey = U.intToCBytesWithPadding o
appendAttrs = Just [(S.KeyTypeFindKey, appendKey)]
storedBs = K.encodeBatchRecords records'
-- FIXME unlikely overflow: convert batchLength from Int to Int32
Expand Down
12 changes: 6 additions & 6 deletions hstream-store/test/HStream/StoreSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,22 @@ base = describe "Base" $ do
void $ S.appendCompressedBS client randlogid "p" S.CompressionNone (Just [])
sn0 <- S.appendCompLSN <$>
S.appendCompressedBS client randlogid "p0" S.CompressionNone
(Just [(S.KeyTypeFindKey, "0")])
(Just [(S.KeyTypeFindKey, "00")])
sn1 <- S.appendCompLSN <$>
S.appendCompressedBS client randlogid "p1" S.CompressionNone
(Just [(S.KeyTypeFindKey, "1")])
(Just [(S.KeyTypeFindKey, "01")])
sn2 <- S.appendCompLSN <$>
S.appendCompressedBS client randlogid "p2" S.CompressionNone
(Just [(S.KeyTypeFindKey, "2")])
(Just [(S.KeyTypeFindKey, "02")])
void $ S.appendCompressedBS client randlogid "p" S.CompressionNone Nothing

(lo0, hi0) <- S.findKey client randlogid "0" S.FindKeyStrict
(lo0, hi0) <- S.findKey client randlogid "00" S.FindKeyStrict
lo0 `shouldBe` S.LSN_INVALID
hi0 `shouldBe` sn0
(lo1, hi1) <- S.findKey client randlogid "1" S.FindKeyStrict
(lo1, hi1) <- S.findKey client randlogid "01" S.FindKeyStrict
lo1 `shouldBe` sn0
hi1 `shouldBe` sn1
(lo2, hi2) <- S.findKey client randlogid "2" S.FindKeyStrict
(lo2, hi2) <- S.findKey client randlogid "02" S.FindKeyStrict
lo2 `shouldBe` sn1
hi2 `shouldBe` sn2

Expand Down

0 comments on commit 869c0a1

Please sign in to comment.