Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka SparseOffset #1817

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Config/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,11 @@ data StorageOptions = StorageOptions

data ExperimentalFeature
= ExperimentalCppServer
| ExperimentalSparseOffset
deriving (Show, Eq)

parseExperimentalFeature :: O.ReadM ExperimentalFeature
parseExperimentalFeature = O.eitherReader $ \case
"cpp" -> Right ExperimentalCppServer
x -> Left $ "cannot parse experimental feature: " <> x
"cpp" -> Right ExperimentalCppServer
"sparse-offset" -> Right ExperimentalSparseOffset
x -> Left $ "cannot parse experimental feature: " <> x
59 changes: 56 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

module HStream.Kafka.Server.Handler
( handlers
, sparseOffsetHandlers
, unAuthedHandlers
) where

Expand All @@ -13,10 +14,13 @@ import HStream.Kafka.Server.Handler.Group
import HStream.Kafka.Server.Handler.Offset
import HStream.Kafka.Server.Handler.Produce
import HStream.Kafka.Server.Handler.Security
import HStream.Kafka.Server.Handler.SparseOffset.Consume
import HStream.Kafka.Server.Handler.SparseOffset.Offset
import HStream.Kafka.Server.Handler.SparseOffset.Produce
import HStream.Kafka.Server.Handler.Topic
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -93,6 +97,11 @@ import qualified Kafka.Protocol.Service as K
#cv_handler CreateAcls, 0, 0
#cv_handler DeleteAcls, 0, 0

-- SparseOffset
#cv_handler ListOffsets, 0, 2, SparseOffset
#cv_handler Produce, 0, 7, SparseOffset
#cv_handler Fetch, 0, 6, SparseOffset

handlers :: ServerContext -> [K.ServiceHandler]
handlers sc =
[ #mk_handler ApiVersions, 0, 3
Expand Down Expand Up @@ -142,3 +151,47 @@ unAuthedHandlers sc =
[ #mk_handler ApiVersions, 0, 3
, #mk_handler SaslHandshake, 0, 1
]

sparseOffsetHandlers :: ServerContext -> [K.ServiceHandler]
sparseOffsetHandlers sc =
[ #mk_handler ApiVersions, 0, 3
, #mk_handler ListOffsets, 0, 2, SparseOffset
, #mk_handler Metadata, 0, 5
-- Write
, #mk_handler Produce, 0, 7, SparseOffset
, #mk_handler InitProducerId, 0, 0
-- Read
, #mk_handler Fetch, 0, 6, SparseOffset

, #mk_handler FindCoordinator, 0, 1

, #mk_handler CreateTopics, 0, 2
, #mk_handler DeleteTopics, 0, 1
, #mk_handler CreatePartitions, 0, 1

-- Group
, #mk_handler JoinGroup, 0, 2
, #mk_handler SyncGroup, 0, 1
, #mk_handler LeaveGroup, 0, 1
, #mk_handler Heartbeat, 0, 1
, #mk_handler ListGroups, 0, 1
, #mk_handler DescribeGroups, 0, 1

, #mk_handler OffsetCommit, 0, 3
, #mk_handler OffsetFetch, 0, 3

-- configs
, #mk_handler DescribeConfigs, 0, 0

-- Sasl
, #mk_handler SaslHandshake, 0, 1, AfterAuth
, #mk_handler SaslAuthenticate, 0, 0

-- For hstream
, #mk_handler HadminCommand, 0, 0

-- ACL
, #mk_handler DescribeAcls, 0, 0
, #mk_handler CreateAcls, 0, 0
, #mk_handler DeleteAcls, 0, 0
]
30 changes: 23 additions & 7 deletions hstream/app/lib/KafkaServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ app config@ServerOpts{..} = do

-- Experimental features
let usingCppServer = ExperimentalCppServer `elem` experimentalFeatures
Async.withAsync (serve serverContext netOpts usingCppServer) $ \a -> do
usingSparseOffset = ExperimentalSparseOffset `elem` experimentalFeatures
Async.withAsync (serve serverContext netOpts usingCppServer usingSparseOffset) $ \a -> do
-- start gossip
a1 <- startGossip _serverHost gossipContext
Async.link2Only (const True) a a1
Expand All @@ -140,6 +141,7 @@ app config@ServerOpts{..} = do
_serverAdvertisedListeners
_listenersSecurityProtocolMap
usingCppServer
usingSparseOffset
forM_ as (Async.link2Only (const True) a)
-- wait the default server
waitGossipBoot gossipContext
Expand All @@ -153,8 +155,10 @@ app config@ServerOpts{..} = do
serve :: ServerContext -> K.ServerOptions
-> Bool
-- ^ ExperimentalFeature: ExperimentalCppServer
-> Bool
-- ^ ExperimentalFeature: ExperimentalSparseOffset
-> IO ()
serve sc@ServerContext{..} netOpts usingCppServer = do
serve sc@ServerContext{..} netOpts usingCppServer usingSparseOffset = do
Log.i "************************"
hPutStrLn stderr banner
Log.i "************************"
Expand Down Expand Up @@ -190,10 +194,15 @@ serve sc@ServerContext{..} netOpts usingCppServer = do
Log.info $ "Starting"
<> if isJust (K.serverSslOptions netOpts') then " secure " else " insecure "
<> "kafka server..."
handlers <- if usingSparseOffset
then do
Log.warning "Using a experimental feature: SparseOffset"
pure K.sparseOffsetHandlers
else pure K.handlers
if usingCppServer
then do Log.warning "Using a still-in-development c++ kafka server!"
K.runCppServer netOpts' sc K.handlers
else K.runHsServer netOpts' sc K.unAuthedHandlers K.handlers
K.runCppServer netOpts' sc handlers
else K.runHsServer netOpts' sc K.unAuthedHandlers handlers
where
exceptionHandlers =
[ Handler $ \(_ :: HE.RQLiteRowNotFound) -> return ()
Expand All @@ -208,10 +217,12 @@ serveListeners
-> ListenersSecurityProtocolMap
-> Bool
-- ^ ExperimentalFeature: ExperimentalCppServer
-> Bool
-- ^ ExperimentalFeature: ExperimentalSparseOffset
-> IO [Async.Async ()]
serveListeners sc netOpts
securityMap listeners listenerSecurityMap
usingCppServer
usingCppServer usingSparseOffset
= do
let listeners' = [(k, v) | (k, vs) <- Map.toList listeners, v <- Set.toList vs]
forM listeners' $ \(key, I.Listener{..}) -> Async.async $ do
Expand Down Expand Up @@ -240,10 +251,15 @@ serveListeners sc netOpts
<> Log.build key <> ":"
<> Log.build listenerAddress <> ":"
<> Log.build listenerPort
handlers <- if usingSparseOffset
then do
Log.warning "Using a experimental feature: SparseOffset"
pure K.sparseOffsetHandlers
else pure K.handlers
if usingCppServer
then do Log.warning "Using a still-in-development c++ kafka server!"
K.runCppServer netOpts' sc' K.handlers
else K.runHsServer netOpts' sc' K.unAuthedHandlers K.handlers
K.runCppServer netOpts' sc' handlers
else K.runHsServer netOpts' sc' K.unAuthedHandlers handlers

-------------------------------------------------------------------------------

Expand Down
Loading