fix(kafka): improve kafka configs #1696

merged 5 commits into from
Nov 27, 2023
Changes from all commits
6 changes: 3 additions & 3 deletions conf/hstream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,10 @@ kafka:
# probe-interval: 2000000 # 2 sec
# roundtrip-timeout: 500000 # 0.5 sec
#Kafka options
# Broker options (compatible with Kafka)
#num-partitions: 1
#default-replication-factor: 1
#num.partitions: 1
#default.replication.factor: 1

# Configuration for HStream Store
# The configuration for hstore is **Optional**. When the values are not provided,
Expand Down
8 changes: 2 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ parseJSONToOptions CliOptions{..} obj = do
-- Kafka config
-- TODO: generate Parser from KafkaBrokerConfigs
let !_disableAutoCreateTopic = cliDisableAutoCreateTopic
numPartitions <- nodeCfgObj .:? "num-partitions" .!= 1
defaultReplica <- nodeCfgObj .:? "default-replication-factor" .!= 1
let !_topicRepFactor = numPartitions
let !_partitionNums = defaultReplica
let autoCreateTopicsEnable = KC.AutoCreateTopicsEnable (not _disableAutoCreateTopic)
!_kafkaBrokerConfigs = KC.KafkaBrokerConfigs {..}
let updateBrokerConfigs cfg = cfg {KC.autoCreateTopicsEnable=KC.AutoCreateTopicsEnable $ not _disableAutoCreateTopic}
!_kafkaBrokerConfigs <- updateBrokerConfigs <$> KC.parseBrokerConfigs nodeCfgObj

-- TODO: For the max_record_size to work properly, we should also tell user
-- to set payload size for gRPC and LD.
Expand Down
160 changes: 119 additions & 41 deletions hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DefaultSignatures #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE OverloadedStrings #-}

module HStream.Kafka.Server.Config.KafkaConfig where
import qualified Control.Monad as M
import Data.Int (Int32)
import qualified Data.Map as Map
import qualified Data.Text as T
import qualified Data.Text.Read as T
import qualified Data.Vector as V
import qualified Control.Monad as M
import qualified Data.Aeson.Key as Y
import qualified Data.Aeson.Text as Y
import Data.Int (Int32)
import qualified Data.Map as Map
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Read as T
import qualified Data.Vector as V
import qualified Data.Yaml as Y
import qualified GHC.Generics as G

data KafkaConfigResource
Expand Down Expand Up @@ -43,6 +48,14 @@ class Eq kc => KafkaConfig kc where
isDefaultValue = (defaultConfig == )

data KafkaConfigInstance = forall kc. KafkaConfig kc => KafkaConfigInstance kc
instance Eq KafkaConfigInstance where
(==) (KafkaConfigInstance x) (KafkaConfigInstance y) = value x == value y
instance KafkaConfig KafkaConfigInstance where
name (KafkaConfigInstance x) = name x
value (KafkaConfigInstance x) = value x
isSentitive (KafkaConfigInstance x) = isSentitive x
fromText = fromText
defaultConfig = defaultConfig

-- Kafka Topic Config
Expand Down Expand Up @@ -72,14 +85,12 @@ data KafkaTopicConfigs
= KafkaTopicConfigs
{ cleanupPolicy :: CleanupPolicy
, retentionMs :: RetentionMs
} deriving (G.Generic)
instance KafkaConfigs KafkaTopicConfigs

-- TODO: build from KafkaTopicConfigs
mkKafkaTopicConfigs :: Map.Map T.Text (Maybe T.Text) -> Either T.Text KafkaTopicConfigs
mkKafkaTopicConfigs configs =
<$> lookupConfig (defaultConfig @CleanupPolicy) configs
<*> lookupConfig (defaultConfig @RetentionMs) configs
mkKafkaTopicConfigs configs = mkConfigs @KafkaTopicConfigs lk
where lk x = M.join (Map.lookup x configs)

-- Kafka Broker Config
Expand All @@ -95,41 +106,108 @@ instance KafkaConfig AutoCreateTopicsEnable where
fromText v = Left $ "invalid bool value:" <> v
defaultConfig = AutoCreateTopicsEnable True

newtype KafkaBrokerConfigs
newtype NumPartitions = NumPartitions { _value :: Int } deriving (Eq, Show)
instance KafkaConfig NumPartitions where
name = const "num.partitions"
value (NumPartitions v) = Just . T.pack $ show v
isSentitive = const False
fromText t = NumPartitions <$> textToIntE t
defaultConfig = NumPartitions 1

newtype DefaultReplicationFactor = DefaultReplicationFactor { _value :: Int } deriving (Eq, Show)
instance KafkaConfig DefaultReplicationFactor where
name = const "default.replication.factor"
value (DefaultReplicationFactor v) = Just . T.pack $ show v
isSentitive = const False
fromText t = DefaultReplicationFactor <$> textToIntE t
defaultConfig = DefaultReplicationFactor 1

data KafkaBrokerConfigs
= KafkaBrokerConfigs
{ autoCreateTopicsEnable :: AutoCreateTopicsEnable
} deriving (Show, Eq)
{ autoCreateTopicsEnable :: AutoCreateTopicsEnable
, numPartitions :: NumPartitions
, defaultReplicationFactor :: DefaultReplicationFactor
} deriving (Show, Eq, G.Generic)
instance KafkaConfigs KafkaBrokerConfigs

parseBrokerConfigs :: Y.Object -> Y.Parser KafkaBrokerConfigs
parseBrokerConfigs obj =
case mkConfigs @KafkaBrokerConfigs lk of
Left msg -> error (T.unpack msg)
Right v -> pure v
lk :: Lookup
lk configName = Y.parseMaybe (obj Y..:) (Y.fromText configName) >>= \case
Y.String v -> Just v
x -> Just . TL.toStrict . Y.encodeToLazyText $ x

-- TODO: generate from KafkaBrokerConfigs
allBrokerConfigs :: KafkaBrokerConfigs -> V.Vector KafkaConfigInstance
allBrokerConfigs KafkaBrokerConfigs{..} = V.fromList
[ KafkaConfigInstance $ autoCreateTopicsEnable
allBrokerConfigs = V.fromList . Map.elems . dumpConfigs

-- Config Helpers
type Lookup = T.Text -> Maybe T.Text
type ConfigMap = Map.Map T.Text KafkaConfigInstance

class KafkaConfigs a where
mkConfigs :: Lookup -> Either T.Text a
dumpConfigs :: a -> ConfigMap
defaultConfigs :: a

default mkConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => Lookup -> Either T.Text a
mkConfigs lk = <$> gmkConfigs lk

default dumpConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => a -> ConfigMap
dumpConfigs = gdumpConfigs . G.from

default defaultConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => a
defaultConfigs = gdefaultConfigs

class GKafkaConfigs f where
gmkConfigs :: Lookup -> Either T.Text (f p)
gdumpConfigs :: (f p) -> ConfigMap
gdefaultConfigs :: f p

instance KafkaConfig c => GKafkaConfigs (G.K1 i c) where
gmkConfigs lk = G.K1 <$> case lk (name @c defaultConfig) of
Nothing -> Right (defaultConfig @c)
Just textValue -> fromText @c textValue
gdumpConfigs (G.K1 x) = (Map.singleton (name x) (KafkaConfigInstance x))
gdefaultConfigs = G.K1 (defaultConfig @c)

instance GKafkaConfigs f => GKafkaConfigs (G.M1 i c f) where
gmkConfigs lk = G.M1 <$> (gmkConfigs lk)
gdumpConfigs (G.M1 x) = gdumpConfigs x
gdefaultConfigs = G.M1 gdefaultConfigs

instance (GKafkaConfigs a, GKafkaConfigs b) => GKafkaConfigs (a G.:*: b) where
gmkConfigs lk = (G.:*:) <$> (gmkConfigs lk) <*> (gmkConfigs lk)
gdumpConfigs (x G.:*: y) = Map.union (gdumpConfigs x) (gdumpConfigs y)
gdefaultConfigs = gdefaultConfigs G.:*: gdefaultConfigs

#define MK_CONFIG_PAIR(configType) \
let dc = defaultConfig @configType in (name dc, (KafkaConfigInstance dc, fmap KafkaConfigInstance . fromText @configType))

lookupConfig :: KafkaConfig kc => kc -> Map.Map T.Text (Maybe T.Text) -> Either T.Text kc
lookupConfig dc configs =
case M.join . Map.lookup (name dc) $ configs of
Nothing -> Right dc
Just textValue -> fromText textValue

-- TODO: build from KafkaTopicConfigs
allTopicConfigs :: Map.Map T.Text (KafkaConfigInstance, T.Text -> Either T.Text KafkaConfigInstance)
allTopicConfigs = Map.fromList
[ MK_CONFIG_PAIR(CleanupPolicy)
, MK_CONFIG_PAIR(RetentionMs)

getTopicConfig :: T.Text -> Maybe T.Text -> Either T.Text KafkaConfigInstance
getTopicConfig configName configValue =
case Map.lookup configName allTopicConfigs of
Nothing -> Left $ "unsupported config name:" <> configName
Just (dc, fromText') ->
case configValue of
Nothing -> Right dc
Just textValue -> fromText' textValue
allTopicConfigs :: ConfigMap
allTopicConfigs = dumpConfigs (defaultConfigs @KafkaTopicConfigs)

getTopicConfig :: T.Text -> Map.Map T.Text (Maybe T.Text) -> Either T.Text KafkaConfigInstance
getTopicConfig configName configValues = do
let lk x = M.join (Map.lookup x configValues)
computedMap <- dumpConfigs <$> mkConfigs @KafkaTopicConfigs lk
case Map.lookup configName computedMap of
Nothing -> Left $ "unsupported config name:" <> configName
Just cfg -> Right cfg

-- Utils
textToIntE :: T.Text -> Either T.Text Int
textToIntE v =
case (T.signed T.decimal) v of
Left msg -> Left (T.pack msg)
Right (intVal, _) -> Right intVal

intToText :: Show a => a -> T.Text
intToText v = T.pack (show v)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{-# LANGUAGE OverloadedStrings #-}

module HStream.Kafka.Server.Config.KafkaConfigManager where
import qualified Control.Monad as M
import qualified Data.Aeson as J
import Data.Bifunctor (Bifunctor (bimap))
import qualified Data.Map as Map
Expand Down Expand Up @@ -63,7 +62,7 @@ listTopicConfigs KafkaConfigManager{..} topic keys = do
, value=KC.value cfg
getConfig :: Map.Map T.Text (Maybe T.Text) -> T.Text -> Either T.Text K.DescribeConfigsResourceResult
getConfig configs configName = getConfigByInstance <$> KC.getTopicConfig configName (M.join (Map.lookup configName configs))
getConfig configs configName = getConfigByInstance <$> KC.getTopicConfig configName configs

getErrorResponse :: KC.KafkaConfigResource -> T.Text -> T.Text -> K.DescribeConfigsResult
getErrorResponse rt rn msg = K.DescribeConfigsResult
Expand Down
2 changes: 0 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Config/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ data ServerOpts = ServerOpts
, _serverGossipPort :: !Word16
, _gossipOpts :: !GossipOpts

, _topicRepFactor :: !Int
, _partitionNums :: !Int
, _maxRecordSize :: !Int
, _seedNodes :: ![(ByteString, Int)]

Expand Down
10 changes: 8 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Core/Topic.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE OverloadedRecordDot #-}

module HStream.Kafka.Server.Core.Topic
( createTopic
Expand Down Expand Up @@ -29,7 +31,9 @@ createTopic :: ServerContext -> Text -> Int16 -> Int32 -> Map.Map T.Text (Maybe
createTopic ServerContext{..} name replicationFactor numPartitions configs = do
let streamId = S.transToTopicStreamName name
timeStamp <- BaseTime.getSystemNsTimestamp
let replica = if replicationFactor == -1 then scDefaultTopicRepFactor else fromIntegral replicationFactor
let replica = if replicationFactor == -1
then kafkaBrokerConfigs.defaultReplicationFactor._value
else fromIntegral replicationFactor
case KC.mkKafkaTopicConfigs configs of
Left msg -> do $ "create topic failed, invaid config:" <> msg
Expand All @@ -52,7 +56,9 @@ createTopic ServerContext{..} name replicationFactor numPartitions configs = do
Log.warning $ "Exception occurs when creating stream " <> (show streamId) <> ": " <> (show e)
Right _ -> do
let partitions = if numPartitions == -1 then scDefaultPartitionNum else fromIntegral numPartitions
let partitions = if numPartitions == -1
then kafkaBrokerConfigs.numPartitions._value
else fromIntegral numPartitions
let keyTups = CommonTypes.devideKeySpace partitions
shards_e <-
try $ forM (keyTups `zip` [0..]) $ \((startKey, endKey), i) -> do
Expand Down
4 changes: 3 additions & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ handleMetadataV4 ctx@ServerContext{..} _ [email protected]{..} = do
createResp <-
if kafkaBrokerConfigs.autoCreateTopicsEnable._value && allowAutoTopicCreation
then do
let defaultReplicas = kafkaBrokerConfigs.defaultReplicationFactor._value
defaultNumPartitions = kafkaBrokerConfigs.numPartitions._value
resp <- forM needCreate $ \topic -> do
(code, shards) <- createTopic ctx topic (fromIntegral scDefaultTopicRepFactor) (fromIntegral scDefaultPartitionNum) Map.empty
(code, shards) <- createTopic ctx topic (fromIntegral defaultReplicas) (fromIntegral defaultNumPartitions) Map.empty
if code /= K.NONE
then do
return $ K.MetadataResponseTopicV1 code topic False K.emptyKaArray
Expand Down
4 changes: 0 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ data ServerContext = ServerContext
{ serverID :: !Word32
, serverOpts :: !ServerOpts
, scAdvertisedListenersKey :: !(Maybe Text)
, scDefaultTopicRepFactor :: !Int
, scDefaultPartitionNum :: !Int
, scMaxRecordSize :: !Int
, metaHandle :: !MetaHandle
, scStatsHolder :: !Stats.StatsHolder
Expand Down Expand Up @@ -56,8 +54,6 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do
{ serverID = _serverID
, serverOpts = opts
, scAdvertisedListenersKey = Nothing
, scDefaultTopicRepFactor = _topicRepFactor
, scDefaultPartitionNum = _partitionNums
, scMaxRecordSize = _maxRecordSize
, metaHandle = mh
, scStatsHolder = statsHolder
Expand Down