Skip to content

Commit

Permalink
kafka: fix encoding of RecordBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Dec 4, 2023
1 parent 4e3f43e commit 11c3a8d
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@
[submodule "external/haskell-rocksdb-bindings"]
path = external/haskell-rocksdb-bindings
url = https://github.com/hstreamdb/haskell-rocksdb-bindings.git
[submodule "external/digest"]
path = external/digest
url = https://github.com/4eUeP/digest.git
8 changes: 8 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ packages:
external/gRPC-haskell/grpc-haskell.cabal
external/gRPC-haskell/core/grpc-haskell-core.cabal
external/haskell-rocksdb-bindings
external/digest/digest.cabal

-- * Cpp library
common/api/cpp/hstream-api-cpp.cabal
Expand Down Expand Up @@ -52,6 +53,13 @@ package rocksdb-haskell-bindings
extra-lib-dirs: /usr/local/lib
tests: false

-- TODO: More safe way to detect cpu features
package digest
if (arch(x86_64) && os(linux))
flags: +have_builtin_prefetch +have_sse42 +have_strong_getauxval +have_weak_getauxval
else
flags: +have_builtin_prefetch +have_weak_getauxval

constraints:
Z-Data == 2.0.0.2
, zoovisitor == 0.2.6.1
Expand Down
1 change: 1 addition & 0 deletions external/digest
Submodule digest added at d03f34
2 changes: 1 addition & 1 deletion hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ library
HStream.Kafka.Metrics.ServerStats
HStream.Kafka.Server.Config.FromCli
HStream.Kafka.Server.Config.FromJson
HStream.Kafka.Server.Config.Types
HStream.Kafka.Server.Config.KafkaConfig
HStream.Kafka.Server.Config.KafkaConfigManager
HStream.Kafka.Server.Config.Types
HStream.Kafka.Server.Core.Topic
HStream.Kafka.Server.Handler.Basic
HStream.Kafka.Server.Handler.Consume
Expand Down
34 changes: 23 additions & 11 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import qualified Data.ByteString as BS
import Data.ByteString.Internal (w2c)
import qualified Data.ByteString.Lazy as BL
import Data.Digest.CRC32 (crc32)
import Data.Digest.CRC32C (crc32c)
import Data.Int
import Data.Maybe
import Data.String (IsString)
Expand Down Expand Up @@ -350,7 +351,7 @@ data BatchRecord
= BatchRecordV0 RecordV0
| BatchRecordV1 RecordV1
| BatchRecordV2 RecordBatch
deriving (Show)
deriving (Show, Eq)

decodeBatchRecords :: Bool -> ByteString -> IO (Vector BatchRecord)
decodeBatchRecords shouldValidateCrc batchBs = Growing.new >>= decode batchBs
Expand All @@ -365,6 +366,9 @@ decodeBatchRecords shouldValidateCrc batchBs = Growing.new >>= decode batchBs
throwIO $ DecodeError $ "Invalid messageSize"
when shouldValidateCrc $ do
-- NOTE: pass the origin inputs to validLegacyCrc, not the bs'
--
-- The crc field contains the CRC32 (and not CRC-32C) of the
-- subsequent message bytes (i.e. from magic byte to the value).
validLegacyCrc (fromIntegral batchLength) crc bs
(RecordBodyV0{..}, remainder) <- runGet' @RecordBodyV0 bs'
!v' <- Growing.append v (BatchRecordV0 RecordV0{..})
Expand All @@ -375,15 +379,23 @@ decodeBatchRecords shouldValidateCrc batchBs = Growing.new >>= decode batchBs
throwIO $ DecodeError $ "Invalid messageSize"
when shouldValidateCrc $ do
-- NOTE: pass the origin inputs to validLegacyCrc, not the bs'
--
-- The crc field contains the CRC32 (and not CRC-32C) of the
-- subsequent message bytes (i.e. from magic byte to the value).
validLegacyCrc (fromIntegral batchLength) crc bs
(RecordBodyV1{..}, remainder) <- runGet' @RecordBodyV1 bs'
!v' <- Growing.append v (BatchRecordV1 RecordV1{..})
decode remainder v'
2 -> do let partitionLeaderEpoch = partitionLeaderEpochOrCrc
-- The CRC covers the data from the attributes to the end of
-- the batch (i.e. all the bytes that follow the CRC).
--
-- The CRC-32C (Castagnoli) polynomial is used for the
-- computation.
(crc, bs'') <- runGet' @Int32 bs'
when (shouldValidateCrc && fromIntegral (crc32 bs'') /= crc) $
when (shouldValidateCrc && fromIntegral (crc32c bs'') /= crc) $
throwIO $ DecodeError "Invalid CRC32"
(RecordBodyV2{..}, remainder) <- runGet' @RecordBodyV2 bs'
(RecordBodyV2{..}, remainder) <- runGet' @RecordBodyV2 bs''
!v' <- Growing.append v (BatchRecordV2 RecordBatch{..})
decode remainder v'
_ -> throwIO $ DecodeError $ "Invalid magic " <> show magic
Expand Down Expand Up @@ -425,7 +437,7 @@ data RecordBase = RecordBase
-- ^ For version 0-1, this is the CRC32 of the remainder of the record.
-- For version 2, this is the partition leader epoch.
, magic :: {-# UNPACK #-} !Int8
} deriving (Generic, Show)
} deriving (Generic, Show, Eq)

instance Serializable RecordBase

Expand All @@ -436,7 +448,7 @@ data RecordBodyV0 = RecordBodyV0
{ attributes :: {-# UNPACK #-} !Int8
, key :: !NullableBytes
, value :: !NullableBytes
} deriving (Generic, Show)
} deriving (Generic, Show, Eq)

instance Serializable RecordBodyV0

Expand All @@ -448,7 +460,7 @@ data RecordBodyV1 = RecordBodyV1
, timestamp :: {-# UNPACK #-} !Int64
, key :: !NullableBytes
, value :: !NullableBytes
} deriving (Generic, Show)
} deriving (Generic, Show, Eq)

instance Serializable RecordBodyV1

Expand All @@ -464,7 +476,7 @@ data RecordBodyV2 = RecordBodyV2
, producerEpoch :: {-# UNPACK #-} !Int16
, baseSequence :: {-# UNPACK #-} !Int32
, records :: !(KaArray RecordV2)
} deriving (Generic, Show)
} deriving (Generic, Show, Eq)

instance Serializable RecordBodyV2

Expand Down Expand Up @@ -499,7 +511,7 @@ data RecordV0 = RecordV0
, attributes :: {-# UNPACK #-} !Int8
, key :: !NullableBytes
, value :: !NullableBytes
} deriving (Generic, Show)
} deriving (Generic, Show, Eq)

instance Serializable RecordV0

Expand All @@ -516,7 +528,7 @@ data RecordV1 = RecordV1
, timestamp :: {-# UNPACK #-} !Int64
, key :: !NullableBytes
, value :: !NullableBytes
} deriving (Generic, Show)
} deriving (Generic, Show, Eq)

instance Serializable RecordV1

Expand Down Expand Up @@ -558,7 +570,7 @@ data RecordV2 = RecordV2
, key :: !RecordKey
, value :: !RecordValue
, headers :: !(RecordArray RecordHeader)
} deriving (Generic, Show)
} deriving (Generic, Show, Eq)

instance Serializable RecordV2

Expand All @@ -576,7 +588,7 @@ data RecordBatch = RecordBatch
, producerEpoch :: {-# UNPACK #-} !Int16
, baseSequence :: {-# UNPACK #-} !Int32
, records :: !(KaArray RecordV2)
} deriving (Generic, Show)
} deriving (Generic, Show, Eq)

instance Serializable RecordBatch

Expand Down
51 changes: 51 additions & 0 deletions hstream-kafka/protocol/test/Kafka/Protocol/EncodingSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,54 @@ realSpec = describe "Kafka.Protocol.Encoding" $ do

(runPut reqLen <> reqBs) `shouldBe` clientReqBs
runGet clientReqBs `shouldReturn` (reqLen, reqHeader, reqBody)

it "Message Format V2 (Record Batch)" $ do
let clientReqBs =
"\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\139\NUL\NUL\NUL\NUL"
<> "\STX\164\239\196*\NUL\NUL\NUL\NUL\NUL\SOH\NUL\NUL\SOH\140\&3~\176"
<> "\DLE\NUL\NUL\SOH\140\&3~\176\DLE\255\255\255\255\255\255\255\255"
<> "\255\255\255\255\255\255\NUL\NUL\NUL\STXX\NUL\NUL\NUL\SOH\FS"
<> "some_message_0\EOT\SOheader1\ACKfoo\SOheader2\ACKbarX\NUL\NUL\STX"
<> "\SOH\FSsome_message_1\EOT\SOheader1\ACKfoo\SOheader2\ACKbar"
reqRecords = KaArray $ Just $
[RecordV2{ length = 44
, attributes = 0
, timestampDelta = 0
, offsetDelta = 0
, key = RecordKey Nothing
, value = RecordValue $ Just "some_message_0"
, headers = RecordArray $
[ ("header1", RecordHeaderValue $ Just "foo")
, ("header2", RecordHeaderValue $ Just "bar")
]
}
, RecordV2{ length = 44
, attributes = 0
, timestampDelta = 0
, offsetDelta = 1
, key = RecordKey Nothing
, value = RecordValue $ Just "some_message_1"
, headers = RecordArray $
[ ("header1", RecordHeaderValue $ Just "foo")
, ("header2", RecordHeaderValue $ Just "bar")
]
}
]
reqRecordBatch = RecordBatch
{ baseOffset = 0
, batchLength = 139
, partitionLeaderEpoch = 0
, magic = 2
, crc = -1527790550
, attributes = 0
, lastOffsetDelta = 1
, baseTimestamp = 1701670989840
, maxTimestamp = 1701670989840
, producerId = -1
, producerEpoch = -1
, baseSequence = -1
, records = reqRecords
}
clientReq = [BatchRecordV2 reqRecordBatch]
decodeBatchRecords True clientReqBs `shouldReturn` clientReq
encodeBatchRecords clientReq `shouldBe` clientReqBs

0 comments on commit 11c3a8d

Please sign in to comment.