Skip to content

Commit

Permalink
Merge branch 'main' into sasl
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Nov 10, 2023
2 parents d97598d + ff7b2b6 commit 3d42474
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 114 deletions.
34 changes: 16 additions & 18 deletions common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,22 @@ lookupNodePersist metaHandle gossipContext loadBalanceHashRing
case find ((nodeId == ) . A.serverNodeId) serverList of
Just theNode -> return theNode
Nothing -> do
(epoch', hashRing) <- readTVarIO loadBalanceHashRing
if epoch' > epoch
then do
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode'
else do
let errmsg = "the server has not yet synced with the latest member list"
Log.warning $ "lookupNodePersist: " <> Log.buildString errmsg
throwIO $ HE.ResourceAllocationException errmsg
(epoch', hashRing) <- atomically $ do
(epoch', hashRing) <- readTVar loadBalanceHashRing
if epoch' > epoch
then pure (epoch', hashRing)
else retry
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode'

data KafkaResource
= KafkaResTopic Text
Expand Down
2 changes: 2 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libevent-dev \
libevent-openssl-2.1-7 \
libgoogle-glog-dev \
libgsasl7 \
libmysqlclient-dev \
libpython3.10 \
librdkafka++1 \
Expand Down Expand Up @@ -166,6 +167,7 @@ RUN mkdir -p /etc/bash_completion.d && \
grep -wq '^source /etc/profile.d/bash_completion.sh' /etc/bash.bashrc || echo 'source /etc/profile.d/bash_completion.sh' >> /etc/bash.bashrc && \
/usr/local/bin/hadmin --bash-completion-script /usr/local/bin/hadmin > /etc/bash_completion.d/hadmin && \
/usr/local/bin/hadmin-store --bash-completion-script /usr/local/bin/hadmin-store > /etc/bash_completion.d/hadmin-store && \
/usr/local/bin/hstream-kafka-cli --bash-completion-script /usr/local/bin/hstream-kafka-cli > /etc/bash_completion.d/hstream-kafka-cli && \
/usr/local/bin/hstream --bash-completion-script /usr/local/bin/hstream > /etc/bash_completion.d/hstream

EXPOSE 6560 6570
Expand Down
13 changes: 7 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import qualified Kafka.Protocol.Service as K
#define hsc_cv_handler(key, start, end) \
{ \
for (int i = start; i <= end; i++) { \
hsc_printf("handle%sV%d :: K.RequestContext -> K.%sRequestV%d -> IO " \
hsc_printf("handle%sV%d :: ServerContext -> K.RequestContext -> " \
"K.%sRequestV%d -> IO " \
"K.%sResponseV%d \n", \
#key, i, #key, i, #key, i); \
hsc_printf("handle%sV%d ctx req = K.", #key, i); \
hsc_printf("handle%sV%d sc ctx req = K.", #key, i); \
hsc_lowerfirst(#key); \
hsc_printf("ResponseToV%d <$> handle%s ctx (K.", i, #key); \
hsc_printf("ResponseToV%d <$> handle%s sc ctx (K.", i, #key); \
hsc_lowerfirst(#key); \
hsc_printf("RequestFromV%d req)\n", i); \
} \
Expand All @@ -48,17 +49,19 @@ import qualified Kafka.Protocol.Service as K
} \
hsc_printf("K.hd (K.RPC :: K.RPC K.HStreamKafkaV%d \"", i); \
hsc_lowerfirst(#key); \
hsc_printf("\") handle%sV%d\n", #key, i); \
hsc_printf("\") (handle%sV%d sc)\n", #key, i); \
} \
}

-------------------------------------------------------------------------------

#cv_handler ApiVersions, 0, 3
#cv_handler Fetch, 0, 2

handlers :: ServerContext -> [K.ServiceHandler]
handlers sc =
[ #mk_handler ApiVersions, 0, 3
, #mk_handler Fetch, 0, 2

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "metadata") (handleMetadataV1 sc)
Expand All @@ -72,8 +75,6 @@ handlers sc =

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "produce") (handleProduceV2 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "fetch") (handleFetchV2 sc)

-- Offsets
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "listOffsets") (handleListOffsetsV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "listOffsets") (handleListOffsetsV1 sc)
Expand Down
7 changes: 5 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ import qualified Kafka.Protocol.Service as K
--------------------

handleApiVersions
:: K.RequestContext -> K.ApiVersionsRequest -> IO K.ApiVersionsResponse
handleApiVersions _ _ = do
:: ServerContext
-> K.RequestContext
-> K.ApiVersionsRequest
-> IO K.ApiVersionsResponse
handleApiVersions _ _ _ = do
let apiKeys = K.KaArray
. Just
. (V.map apiVersionV0To)
Expand Down
49 changes: 19 additions & 30 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{-# LANGUAGE OverloadedRecordDot #-}

module HStream.Kafka.Server.Handler.Consume
( handleFetchV2
( handleFetch
) where

import Control.Exception
Expand Down Expand Up @@ -37,14 +37,14 @@ type RecordTable =
(GV.Growing V.Vector GV.RealWorld K.RecordFormat)

-- NOTE: this behaviour is not the same as kafka broker
handleFetchV2
handleFetch
:: ServerContext -> K.RequestContext
-> K.FetchRequestV2 -> IO K.FetchResponseV2
handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do
-> K.FetchRequest -> IO K.FetchResponse
handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
-- kafka broker just throw java.lang.RuntimeException if topics is null, here
-- we do the same.
let K.NonNullKaArray topicReqs = r.topics
topics <- V.forM topicReqs $ \K.FetchTopicV2{..} -> do
topics <- V.forM topicReqs $ \K.FetchTopic{..} -> do
orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topic)
let K.NonNullKaArray partitionReqs = partitions
ps <- V.forM partitionReqs $ \p -> do
Expand All @@ -61,9 +61,9 @@ handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do
case elsn of
Left pd -> pure pd
Right _ -> error "LogicError: this should not be right"
pure $ K.FetchableTopicResponseV2 topic (K.NonNullKaArray respPartitionDatas)
let resp = K.FetchResponseV2 0{- TODO: throttleTimeMs -} (K.NonNullKaArray respTopics)
throwIO $ RetFetchRespV2 resp
pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas)
let resp = K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -}
throwIO $ K.FetchResponseEx resp

-- New reader
reader <- S.newLDReader scLDClient (fromIntegral numOfReads) Nothing
Expand Down Expand Up @@ -109,7 +109,7 @@ handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do
Right (_startlsn, _endlsn, hioffset) -> do
mgv <- HT.lookup readRecords logid
case mgv of
Nothing -> pure $ K.PartitionDataV2 p.partition K.NONE hioffset (Just "")
Nothing -> pure $ K.PartitionData p.partition K.NONE hioffset (Just "")
Just gv -> do
v <- GV.unsafeFreeze gv
-- This should not be Nothing, because if we found the key in
Expand All @@ -127,20 +127,9 @@ handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do
let b = V.foldl (<>) (BB.byteString fstRecordBytes)
(V.map (BB.byteString . K.unCompactBytes . (.recordBytes)) vs)
bs = BS.toStrict $ BB.toLazyByteString b
pure $ K.PartitionDataV2 p.partition K.NONE hioffset (Just bs)
pure $ K.FetchableTopicResponseV2 topic (K.NonNullKaArray respPartitionDatas)
pure $ K.FetchResponseV2 0{- TODO: throttleTimeMs -} (K.NonNullKaArray respTopics)

-------------------------------------------------------------------------------

-- TODO: move to Kafka.Protocol.Message.Struct
newtype RetFetchRespV2 = RetFetchRespV2 K.FetchResponseV2
deriving (Show, Eq)

instance Exception RetFetchRespV2

catchFetchV2 :: IO K.FetchResponseV2 -> IO K.FetchResponseV2
catchFetchV2 act = act `catch` \(RetFetchRespV2 resp) -> pure resp
pure $ K.PartitionData p.partition K.NONE hioffset (Just bs)
pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas)
pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -}

-------------------------------------------------------------------------------

Expand All @@ -152,7 +141,7 @@ getPartitionLsn
-> K.OffsetManager
-> S.C_LogID -> Int32
-> Int64 -- ^ kafka start offset
-> IO (Either K.PartitionDataV2 (S.LSN, S.LSN, Int64))
-> IO (Either K.PartitionData (S.LSN, S.LSN, Int64))
getPartitionLsn ldclient om logid partition offset = do
m <- K.getLatestOffsetWithLsn om logid
case m of
Expand All @@ -167,19 +156,19 @@ getPartitionLsn ldclient om logid partition offset = do
| offset == highwaterOffset ->
pure $ Right (tailLsn + 1, tailLsn, highwaterOffset)
| offset > highwaterOffset ->
pure $ Left $ errorPartitionResponseV2 partition K.OFFSET_OUT_OF_RANGE
pure $ Left $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE
-- ghc is not smart enough to detact my partten matching is complete
| otherwise -> error "This should not be reached (getPartitionLsn)"
Nothing -> do
Log.debug $ "Partition " <> Log.build logid <> " is empty"
if offset == 0
then pure $ Right (S.LSN_MIN, S.LSN_INVALID, 0)
else pure $ Left $ errorPartitionResponseV2 partition K.OFFSET_OUT_OF_RANGE
else pure $ Left $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE

errorPartitionResponseV2 :: Int32 -> K.ErrorCode -> K.PartitionDataV2
errorPartitionResponseV2 partitionIndex ec =
K.PartitionDataV2 partitionIndex ec (-1) (Just "")
{-# INLINE errorPartitionResponseV2 #-}
errorPartitionResponse :: Int32 -> K.ErrorCode -> K.PartitionData
errorPartitionResponse partitionIndex ec =
K.PartitionData partitionIndex ec (-1) (Just "")
{-# INLINE errorPartitionResponse #-}

foldWhileM :: Monad m => a -> (a -> m (a, Bool)) -> m a
foldWhileM !a f = do
Expand Down
Loading

0 comments on commit 3d42474

Please sign in to comment.