Skip to content

Commit

Permalink
Upgrade Fetch version to v3 (#1688)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Dec 5, 2023
1 parent ca26795 commit f79dfc0
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 41 deletions.
9 changes: 8 additions & 1 deletion conf/hstream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,19 @@ kafka:
# gossip-interval: 1000000 # 1 sec
# probe-interval: 2000000 # 2 sec
# roundtrip-timeout: 500000 # 0.5 sec
#

# Broker options (compatible with Kafka)
#
#num.partitions: 1
#default.replication.factor: 1

# Internal storage options
#
#storage:
# fetch-mode: 1 # TODO: Currently, only mode 1 is supported
# fetch-reader-timeout: 50 # 50ms, default timeout of each read, 0 means nonblocking
# fetch-maxlen: 1000 # default max size of each read

# Configuration for HStream Store
# The configuration for hstore is **Optional**. When the values are not provided,
# hstreamdb will use the following configuration as the default configuration.
Expand Down
1 change: 1 addition & 0 deletions hstream-kafka/HStream/Kafka/Server/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module HStream.Kafka.Server.Config
, TlsConfig (..)
, SecurityProtocolMap, defaultProtocolMap
, advertisedListenersToPB
, StorageOptions (..)
) where

import Control.Exception (throwIO)
Expand Down
21 changes: 13 additions & 8 deletions hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ parseJSONToOptions CliOptions{..} obj = do
joinWorkerConcurrency <- clusterCfgObj .:? "join-worker-concurrency" .!= joinWorkerConcurrency defaultGossipOpts
let _gossipOpts = GossipOpts {..}

-- Store Config
storeCfgObj <- obj .:? "hstore" .!= mempty
storeLogLevel <- readWithErrLog "store log-level" <$> storeCfgObj .:? "log-level" .!= "info"

let !_ldConfigPath = cliStoreConfigPath
let !_ldLogLevel = fromMaybe storeLogLevel cliLdLogLevel

-- TLS config
nodeEnableTls <- nodeCfgObj .:? "enable-tls" .!= False
nodeTlsKeyPath <- nodeCfgObj .:? "tls-key-path"
Expand All @@ -107,7 +100,19 @@ parseJSONToOptions CliOptions{..} obj = do
nodeEnableSaslAuth <- nodeCfgObj .:? "enable-sasl" .!= False
let !_enableSaslAuth = cliEnableSaslAuth || nodeEnableSaslAuth

return ServerOpts {..}
-- Store Config
storeCfgObj <- obj .:? "hstore" .!= mempty
storeLogLevel <- readWithErrLog "store log-level" <$> storeCfgObj .:? "log-level" .!= "info"
let !_ldConfigPath = cliStoreConfigPath
let !_ldLogLevel = fromMaybe storeLogLevel cliLdLogLevel

-- Storage config
storageCfg <- nodeCfgObj .:? "storage" .!= mempty
fetchReaderTimeout <- storageCfg .:? "fetch-reader-timeout" .!= 50
fetchMaxLen <- storageCfg .:? "fetch-maxlen" .!= 1000
let _storage = StorageOptions{..}

return ServerOpts{..}

-------------------------------------------------------------------------------

Expand Down
18 changes: 13 additions & 5 deletions hstream-kafka/HStream/Kafka/Server/Config/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module HStream.Kafka.Server.Config.Types
, ListenersSecurityProtocolMap
, TlsConfig (..)
, SecurityProtocolMap, defaultProtocolMap
, StorageOptions (..)

-- * Helpers
, advertisedListenersToPB
Expand Down Expand Up @@ -66,15 +67,17 @@ data ServerOpts = ServerOpts

, _maxRecordSize :: !Int
, _seedNodes :: ![(ByteString, Int)]

, _ldLogLevel :: !LDLogLevel
, _ldConfigPath :: !CBytes

, _compression :: !Compression
, _disableAutoCreateTopic :: !Bool

, _enableSaslAuth :: !Bool

, _kafkaBrokerConfigs :: !KC.KafkaBrokerConfigs

-- Store Options
, _storage :: !StorageOptions
, _compression :: !Compression
, _ldLogLevel :: !LDLogLevel
, _ldConfigPath :: !CBytes
} deriving (Show, Eq)

-------------------------------------------------------------------------------
Expand Down Expand Up @@ -184,3 +187,8 @@ parseMetaStoreAddr t =
| s == "file" -> FileAddr . Text.unpack $ ip
| otherwise -> errorWithoutStackTrace $ "Invalid meta store address, unsupported scheme: " <> show s
Left eMsg -> errorWithoutStackTrace eMsg

data StorageOptions = StorageOptions
{ fetchReaderTimeout :: Int
, fetchMaxLen :: Int
} deriving (Show, Eq)
4 changes: 2 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import qualified Kafka.Protocol.Service as K

#cv_handler ApiVersions, 0, 3
#cv_handler Produce, 0, 2
#cv_handler Fetch, 0, 2
#cv_handler Fetch, 0, 3
#cv_handler DescribeConfigs, 0, 0

#cv_handler SaslHandshake, 0, 1
Expand All @@ -69,7 +69,7 @@ handlers sc =
-- Write
, #mk_handler Produce, 0, 2
-- Read
, #mk_handler Fetch, 0, 2
, #mk_handler Fetch, 0, 3

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "metadata") (handleMetadataV1 sc)
Expand Down
87 changes: 65 additions & 22 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module HStream.Kafka.Server.Handler.Consume

import Control.Exception
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as BB
import Data.Either (isRight)
Expand All @@ -25,6 +26,8 @@ import HStream.Kafka.Metrics.ConsumeStats (readLatencySnd,
topicTotalSendBytes,
topicTotalSendMessages,
totalConsumeRequest)
import HStream.Kafka.Server.Config (ServerOpts (..),
StorageOptions (..))
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
Expand Down Expand Up @@ -52,11 +55,12 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
-- kafka broker just throw java.lang.RuntimeException if topics is null, here
-- we do the same.
let K.NonNullKaArray topicReqs = r.topics
topics <- V.forM topicReqs $ \K.FetchTopic{..} -> do
orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topic)
let K.NonNullKaArray partitionReqs = partitions
ps <- V.forM partitionReqs $ \p -> do
P.withLabel totalConsumeRequest (topic, T.pack . show $ p.partition) $
topics <- V.forM topicReqs $ \topic{- K.FetchTopic -} -> do
orderedParts <- S.listStreamPartitionsOrdered scLDClient
(S.transToTopicStreamName topic.topic)
let K.NonNullKaArray partitionReqs = topic.partitions
ps <- V.forM partitionReqs $ \p{- K.FetchPartition -} -> do
P.withLabel totalConsumeRequest (topic.topic, T.pack . show $ p.partition) $
\counter -> void $ P.addCounter counter 1
let m_logid = orderedParts V.!? fromIntegral p.partition
case m_logid of
Expand All @@ -69,10 +73,10 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
elsn <- getPartitionLsn scLDClient scOffsetManager logid p.partition
p.fetchOffset
pure (logid, elsn, p)
pure (topic, ps)
pure (topic.topic, ps)

let numOfReads = V.sum $
V.map (V.length . (V.filter $ \(_, x, _) -> isRight x) . snd) topics
V.map (V.length . V.filter (\(_, x, _) -> isRight x) . snd) topics
when (numOfReads < 1) $ do
respTopics <- V.forM topics $ \(topic, partitions) -> do
respPartitionDatas <- V.forM partitions $ \(_, elsn, _) -> do
Expand All @@ -97,24 +101,22 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
S.readerStartReading reader logid startlsn S.LSN_MAX

-- Read records from storage
if r.minBytes <= 0 || r.maxWaitMs <= 0
then S.readerSetTimeout reader 0 -- nonblocking
else S.readerSetTimeout reader r.maxWaitMs
S.readerSetWaitOnlyWhenNoData reader
(_, records) <- foldWhileM (0, []) $ \(size, acc) -> do
-- TODO: If we can record latency at a more granular level, such as for
-- a consumer group, or a stream?
rs <- P.observeDuration readLatencySnd $ S.readerRead reader 100
if null rs
then pure ((size, acc), False)
else do let size' = size + (sum $ map (K.recordBytesSize . (.recordPayload)) rs)
acc' = acc <> rs
if size' >= fromIntegral r.minBytes
then pure ((size', acc'), False)
else pure ((size', acc'), True)
--
-- TODO:
-- - dynamically change reader settings according to the client request
-- (e.g. maxWaitMs, minBytes...)
-- - handle maxBytes
--
-- FIXME: Do not setWaitOnlyWhenNoData if you are mostly focusing on
-- throughput
--
-- Mode1
records <- readMode1 reader

-- Process read records
-- TODO: what if client send two same topic but with different partitions?
--
-- {logid: [RecordFormat]}
readRecords <- HT.initialize numOfReads :: IO RecordTable
forM_ records $ \record -> do
recordFormat <- K.runGet @K.RecordFormat record.recordPayload
Expand Down Expand Up @@ -156,6 +158,47 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas)
pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -}

where
readMode0 :: S.LDReader -> IO [S.DataRecord ByteString]
readMode0 reader = do
if r.minBytes <= 0 || r.maxWaitMs <= 0
then S.readerSetTimeout reader 0 -- nonblocking
else S.readerSetTimeout reader r.maxWaitMs
S.readerSetWaitOnlyWhenNoData reader
(_, records) <- foldWhileM (0, []) $ \(size, acc) -> do
rs <- P.observeDuration readLatencySnd $ S.readerRead reader 100
if null rs
then pure ((size, acc), False)
else do let size' = size + sum (map (K.recordBytesSize . (.recordPayload)) rs)
acc' = acc <> rs
if size' >= fromIntegral r.minBytes
then pure ((size', acc'), False)
else pure ((size', acc'), True)
pure records

readMode1 :: S.LDReader -> IO [S.DataRecord ByteString]
readMode1 reader = do
let storageOpts = serverOpts._storage
defTimeout = fromIntegral storageOpts.fetchReaderTimeout

if r.minBytes <= 0 || r.maxWaitMs <= 0 -- respond immediately
then do S.readerSetTimeout reader 0 -- nonblocking
S.readerRead reader storageOpts.fetchMaxLen
else
if r.maxWaitMs > defTimeout
then do
S.readerSetTimeout reader defTimeout
rs1 <- S.readerRead reader storageOpts.fetchMaxLen
let size = sum (map (K.recordBytesSize . (.recordPayload)) rs1)
if size >= fromIntegral r.minBytes
then pure rs1
else do S.readerSetTimeout reader (r.maxWaitMs - defTimeout)
rs2 <- S.readerRead reader storageOpts.fetchMaxLen
pure $ rs1 <> rs2
else do
S.readerSetTimeout reader r.maxWaitMs
S.readerRead reader storageOpts.fetchMaxLen

-------------------------------------------------------------------------------

-- Return tuple of (startLsn, tailLsn, highwaterOffset)
Expand Down
39 changes: 37 additions & 2 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ type FetchPartitionV2 = FetchPartitionV0

type FetchTopicV2 = FetchTopicV0

type FetchPartitionV3 = FetchPartitionV0

type FetchTopicV3 = FetchTopicV0

data PartitionDataV0 = PartitionDataV0
{ partitionIndex :: {-# UNPACK #-} !Int32
-- ^ The partition index.
Expand Down Expand Up @@ -246,6 +250,10 @@ type PartitionDataV2 = PartitionDataV0

type FetchableTopicResponseV2 = FetchableTopicResponseV0

type PartitionDataV3 = PartitionDataV0

type FetchableTopicResponseV3 = FetchableTopicResponseV0

data JoinGroupRequestProtocolV0 = JoinGroupRequestProtocolV0
{ name :: !Text
-- ^ The protocol name.
Expand Down Expand Up @@ -736,6 +744,22 @@ type FetchRequestV1 = FetchRequestV0

type FetchRequestV2 = FetchRequestV0

data FetchRequestV3 = FetchRequestV3
{ replicaId :: {-# UNPACK #-} !Int32
-- ^ The broker ID of the follower, of -1 if this request is from a
-- consumer.
, maxWaitMs :: {-# UNPACK #-} !Int32
-- ^ The maximum time in milliseconds to wait for the response.
, minBytes :: {-# UNPACK #-} !Int32
-- ^ The minimum bytes to accumulate in the response.
, maxBytes :: {-# UNPACK #-} !Int32
-- ^ The maximum bytes to fetch. See KIP-74 for cases where this limit may
-- not be honored.
, topics :: !(KaArray FetchTopicV0)
-- ^ The topics to fetch.
} deriving (Show, Eq, Generic)
instance Serializable FetchRequestV3

newtype FetchResponseV0 = FetchResponseV0
{ responses :: (KaArray FetchableTopicResponseV0)
} deriving (Show, Eq, Generic)
Expand All @@ -752,6 +776,8 @@ instance Serializable FetchResponseV1

type FetchResponseV2 = FetchResponseV1

type FetchResponseV3 = FetchResponseV1

newtype FindCoordinatorRequestV0 = FindCoordinatorRequestV0
{ key :: Text
} deriving (Show, Eq, Generic)
Expand Down Expand Up @@ -1390,10 +1416,18 @@ data HStreamKafkaV3
instance Service HStreamKafkaV3 where
type ServiceName HStreamKafkaV3 = "HStreamKafkaV3"
type ServiceMethods HStreamKafkaV3 =
'[ "metadata"
'[ "fetch"
, "metadata"
, "apiVersions"
]

instance HasMethodImpl HStreamKafkaV3 "fetch" where
type MethodName HStreamKafkaV3 "fetch" = "fetch"
type MethodKey HStreamKafkaV3 "fetch" = 1
type MethodVersion HStreamKafkaV3 "fetch" = 3
type MethodInput HStreamKafkaV3 "fetch" = FetchRequestV3
type MethodOutput HStreamKafkaV3 "fetch" = FetchResponseV3

instance HasMethodImpl HStreamKafkaV3 "metadata" where
type MethodName HStreamKafkaV3 "metadata" = "metadata"
type MethodKey HStreamKafkaV3 "metadata" = 3
Expand Down Expand Up @@ -1453,7 +1487,7 @@ instance Show ApiKey where
supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 0 2
, ApiVersionV0 (ApiKey 1) 0 2
, ApiVersionV0 (ApiKey 1) 0 3
, ApiVersionV0 (ApiKey 2) 0 1
, ApiVersionV0 (ApiKey 3) 0 4
, ApiVersionV0 (ApiKey 8) 0 2
Expand All @@ -1480,6 +1514,7 @@ getHeaderVersion (ApiKey 0) 2 = (1, 0)
getHeaderVersion (ApiKey 1) 0 = (1, 0)
getHeaderVersion (ApiKey 1) 1 = (1, 0)
getHeaderVersion (ApiKey 1) 2 = (1, 0)
getHeaderVersion (ApiKey 1) 3 = (1, 0)
getHeaderVersion (ApiKey 2) 0 = (1, 0)
getHeaderVersion (ApiKey 2) 1 = (1, 0)
getHeaderVersion (ApiKey 3) 0 = (1, 0)
Expand Down
Loading

0 comments on commit f79dfc0

Please sign in to comment.