Skip to content

Commit

Permalink
Kafka: enable concurrent wirtes to different partitions in one Produc…
Browse files Browse the repository at this point in the history
…eRequest (#1684)
  • Loading branch information
4eUeP authored Nov 21, 2023
1 parent df3b1cc commit a617008
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 59 deletions.
7 changes: 4 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import qualified Kafka.Protocol.Service as K
-------------------------------------------------------------------------------

#cv_handler ApiVersions, 0, 3
#cv_handler Produce, 0, 2
#cv_handler Fetch, 0, 2

#cv_handler SaslHandshake, 0, 1
Expand All @@ -64,6 +65,9 @@ import qualified Kafka.Protocol.Service as K
handlers :: ServerContext -> [K.ServiceHandler]
handlers sc =
[ #mk_handler ApiVersions, 0, 3
-- Write
, #mk_handler Produce, 0, 2
-- Read
, #mk_handler Fetch, 0, 2

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc)
Expand All @@ -73,11 +77,8 @@ handlers sc =
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV4 "metadata") (handleMetadataV4 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "createTopics") (handleCreateTopicsV0 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "deleteTopics") (handleDeleteTopicsV0 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "produce") (handleProduceV2 sc)

-- Offsets
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "listOffsets") (handleListOffsetsV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "listOffsets") (handleListOffsetsV1 sc)
Expand Down
42 changes: 24 additions & 18 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module HStream.Kafka.Server.Handler.Produce
( handleProduceV2
( handleProduce
) where

import qualified Control.Concurrent.Async as Async
import Control.Monad
import Data.ByteString (ByteString)
import Data.Int
Expand Down Expand Up @@ -34,39 +35,44 @@ import qualified Kafka.Protocol.Service as K
-- guarantees that the record will not be lost as long as at least one
-- in-sync replica remains alive. This is the strongest available
-- guarantee.
handleProduceV2
handleProduce
:: ServerContext
-> K.RequestContext
-> K.ProduceRequestV2
-> IO K.ProduceResponseV2
handleProduceV2 ServerContext{..} _ K.ProduceRequestV2{..} = do
-> K.ProduceRequest
-> IO K.ProduceResponse
handleProduce ServerContext{..} _ req = do
-- TODO: handle request args: acks, timeoutMs
let topicData = fromMaybe V.empty (K.unKaArray req.topicData)

let topicData' = fromMaybe V.empty (K.unKaArray topicData)
responses <- V.forM topicData' $ \K.TopicProduceDataV2{..} -> do
responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do
-- A topic is a stream. Here we donot need to check the topic existence,
-- because the metadata api does(?)
let topic = S.transToTopicStreamName name
partitions <- S.listStreamPartitionsOrdered scLDClient topic
let partitionData' = fromMaybe V.empty (K.unKaArray partitionData)
partitionResponses <- V.forM partitionData' $ \K.PartitionProduceDataV2{..} -> do
let Just (_, logid) = partitions V.!? (fromIntegral index) -- TODO: handle Nothing
let Just recordBytes' = recordBytes -- TODO: handle Nothing
partitions <- S.listStreamPartitionsOrdered
scLDClient (S.transToTopicStreamName topic.name)
let partitionData = fromMaybe V.empty (K.unKaArray topic.partitionData)
-- TODO: limit total concurrencies ?
let loopPart = if V.length partitionData > 1
then Async.forConcurrently
else V.forM
partitionResponses <- loopPart partitionData $ \partition -> do
let Just (_, logid) = partitions V.!? (fromIntegral partition.index) -- TODO: handle Nothing
let Just recordBytes = partition.recordBytes -- TODO: handle Nothing
Log.debug1 $ "Append to logid " <> Log.build logid
<> "(" <> Log.build index <> ")"
<> "(" <> Log.build partition.index <> ")"

-- Wirte appends
(S.AppendCompletion{..}, offset) <-
appendRecords True scLDClient scOffsetManager logid recordBytes'
appendRecords True scLDClient scOffsetManager logid recordBytes

Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN

-- TODO: logAppendTimeMs, only support LogAppendTime now
pure $ K.PartitionProduceResponseV2 index K.NONE offset appendCompTimestamp
pure $ K.PartitionProduceResponse partition.index K.NONE offset appendCompTimestamp

pure $ K.TopicProduceResponseV2 name (K.KaArray $ Just partitionResponses)
pure $ K.TopicProduceResponse topic.name (K.KaArray $ Just partitionResponses)

pure $ K.ProduceResponseV2 (K.KaArray $ Just responses) 0{- TODO: throttleTimeMs -}
pure $ K.ProduceResponse (K.KaArray $ Just responses) 0{- TODO: throttleTimeMs -}

-------------------------------------------------------------------------------

Expand Down
15 changes: 7 additions & 8 deletions hstream-kafka/cbits/hs_kafka_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ class HsDeliveryReportCb : public RdKafka::DeliveryReportCb {
/* If message.err() is non-zero the message delivery failed permanently
* for the message. */
if (message.err())
std::cerr << "Message delivery failed: " << message.errstr()
<< std::endl;
std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
else
std::cerr << "Message delivered to topic " << message.topic_name()
<< " [" << message.partition() << "] at offset "
<< message.offset() << std::endl;
std::cerr << "Message delivered to topic " << message.topic_name() << " ["
<< message.partition() << "] at offset " << message.offset()
<< std::endl;
}
};

Expand Down Expand Up @@ -251,7 +250,7 @@ void hs_delete_producer(HsProducer* p) { delete p; }

HsInt hs_producer_produce(HsProducer* p, const char* topic_, HsInt topic_size_,
int32_t partition_, const char* payload_,
HsInt payload_size_, const char* key_,
HsInt payload_size_, const char* key_,
HsInt key_size_, std::string* errstr) {

std::string topic(topic_, topic_size_);
Expand Down Expand Up @@ -308,8 +307,8 @@ void hs_producer_flush(HsProducer* p) {
p->producer->flush(10 * 1000 /* wait for max 10 seconds */);

if (p->producer->outq_len() > 0) {
std::cerr << p->producer->outq_len()
<< " message(s) were not delivered" << std::endl;
std::cerr << p->producer->outq_len() << " message(s) were not delivered"
<< std::endl;
}
}

Expand Down
6 changes: 4 additions & 2 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ library
HStream.Kafka.Server.Config.FromCli
HStream.Kafka.Server.Config.FromJson
HStream.Kafka.Server.Config.Types
HStream.Kafka.Server.Core.Topic
HStream.Kafka.Server.Handler.Basic
HStream.Kafka.Server.Handler.Consume
HStream.Kafka.Server.Handler.Group
HStream.Kafka.Server.Handler.Offset
HStream.Kafka.Server.Handler.Produce
HStream.Kafka.Server.Handler.Security
HStream.Kafka.Server.Handler.Topic
HStream.Kafka.Server.Core.Topic

cxx-sources: cbits/hs_kafka_client.cpp
cxx-options: -std=c++17
Expand All @@ -141,10 +141,12 @@ library
build-tool-depends: hsc2hs:hsc2hs
build-depends:
, aeson
, async ^>=2.2
, attoparsec
, base >=4.11 && <5
, base64
, bytestring
, clock
, colourista ^>=0.1.0.1
, containers
, directory
Expand All @@ -171,12 +173,12 @@ library
, yaml
, Z-Data
, zoovisitor
, clock

default-language: GHC2021
default-extensions:
DerivingStrategies
LambdaCase
MultiWayIf
OverloadedRecordDot
OverloadedStrings
RecordWildCards
88 changes: 77 additions & 11 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -498,21 +498,51 @@ type OffsetFetchResponsePartitionV2 = OffsetFetchResponsePartitionV0

type OffsetFetchResponseTopicV2 = OffsetFetchResponseTopicV0

data PartitionProduceDataV2 = PartitionProduceDataV2
data PartitionProduceDataV0 = PartitionProduceDataV0
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, recordBytes :: !NullableBytes
-- ^ The record data to be produced.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceDataV2
instance Serializable PartitionProduceDataV0

data TopicProduceDataV2 = TopicProduceDataV2
data TopicProduceDataV0 = TopicProduceDataV0
{ name :: !Text
-- ^ The topic name.
, partitionData :: !(KaArray PartitionProduceDataV2)
, partitionData :: !(KaArray PartitionProduceDataV0)
-- ^ Each partition to produce to.
} deriving (Show, Eq, Generic)
instance Serializable TopicProduceDataV2
instance Serializable TopicProduceDataV0

type PartitionProduceDataV1 = PartitionProduceDataV0

type TopicProduceDataV1 = TopicProduceDataV0

type PartitionProduceDataV2 = PartitionProduceDataV0

type TopicProduceDataV2 = TopicProduceDataV0

data PartitionProduceResponseV0 = PartitionProduceResponseV0
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, baseOffset :: {-# UNPACK #-} !Int64
-- ^ The base offset.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceResponseV0

data TopicProduceResponseV0 = TopicProduceResponseV0
{ name :: !Text
-- ^ The topic name
, partitionResponses :: !(KaArray PartitionProduceResponseV0)
-- ^ Each partition that we produced to within the topic.
} deriving (Show, Eq, Generic)
instance Serializable TopicProduceResponseV0

type PartitionProduceResponseV1 = PartitionProduceResponseV0

type TopicProduceResponseV1 = TopicProduceResponseV0

data PartitionProduceResponseV2 = PartitionProduceResponseV2
{ index :: {-# UNPACK #-} !Int32
Expand Down Expand Up @@ -921,17 +951,35 @@ data OffsetFetchResponseV2 = OffsetFetchResponseV2
} deriving (Show, Eq, Generic)
instance Serializable OffsetFetchResponseV2

data ProduceRequestV2 = ProduceRequestV2
data ProduceRequestV0 = ProduceRequestV0
{ acks :: {-# UNPACK #-} !Int16
-- ^ The number of acknowledgments the producer requires the leader to have
-- received before considering a request complete. Allowed values: 0 for no
-- acknowledgments, 1 for only the leader and -1 for the full ISR.
, timeoutMs :: {-# UNPACK #-} !Int32
-- ^ The timeout to await a response in milliseconds.
, topicData :: !(KaArray TopicProduceDataV2)
, topicData :: !(KaArray TopicProduceDataV0)
-- ^ Each topic to produce to.
} deriving (Show, Eq, Generic)
instance Serializable ProduceRequestV2
instance Serializable ProduceRequestV0

type ProduceRequestV1 = ProduceRequestV0

type ProduceRequestV2 = ProduceRequestV0

newtype ProduceResponseV0 = ProduceResponseV0
{ responses :: (KaArray TopicProduceResponseV0)
} deriving (Show, Eq, Generic)
instance Serializable ProduceResponseV0

data ProduceResponseV1 = ProduceResponseV1
{ responses :: !(KaArray TopicProduceResponseV0)
-- ^ Each produce response
, throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
} deriving (Show, Eq, Generic)
instance Serializable ProduceResponseV1

data ProduceResponseV2 = ProduceResponseV2
{ responses :: !(KaArray TopicProduceResponseV2)
Expand Down Expand Up @@ -1002,7 +1050,8 @@ data HStreamKafkaV0
instance Service HStreamKafkaV0 where
type ServiceName HStreamKafkaV0 = "HStreamKafkaV0"
type ServiceMethods HStreamKafkaV0 =
'[ "fetch"
'[ "produce"
, "fetch"
, "listOffsets"
, "metadata"
, "offsetCommit"
Expand All @@ -1021,6 +1070,13 @@ instance Service HStreamKafkaV0 where
, "saslAuthenticate"
]

instance HasMethodImpl HStreamKafkaV0 "produce" where
type MethodName HStreamKafkaV0 "produce" = "produce"
type MethodKey HStreamKafkaV0 "produce" = 0
type MethodVersion HStreamKafkaV0 "produce" = 0
type MethodInput HStreamKafkaV0 "produce" = ProduceRequestV0
type MethodOutput HStreamKafkaV0 "produce" = ProduceResponseV0

instance HasMethodImpl HStreamKafkaV0 "fetch" where
type MethodName HStreamKafkaV0 "fetch" = "fetch"
type MethodKey HStreamKafkaV0 "fetch" = 1
Expand Down Expand Up @@ -1145,7 +1201,8 @@ data HStreamKafkaV1
instance Service HStreamKafkaV1 where
type ServiceName HStreamKafkaV1 = "HStreamKafkaV1"
type ServiceMethods HStreamKafkaV1 =
'[ "fetch"
'[ "produce"
, "fetch"
, "listOffsets"
, "metadata"
, "offsetCommit"
Expand All @@ -1154,6 +1211,13 @@ instance Service HStreamKafkaV1 where
, "apiVersions"
]

instance HasMethodImpl HStreamKafkaV1 "produce" where
type MethodName HStreamKafkaV1 "produce" = "produce"
type MethodKey HStreamKafkaV1 "produce" = 0
type MethodVersion HStreamKafkaV1 "produce" = 1
type MethodInput HStreamKafkaV1 "produce" = ProduceRequestV1
type MethodOutput HStreamKafkaV1 "produce" = ProduceResponseV1

instance HasMethodImpl HStreamKafkaV1 "fetch" where
type MethodName HStreamKafkaV1 "fetch" = "fetch"
type MethodKey HStreamKafkaV1 "fetch" = 1
Expand Down Expand Up @@ -1324,7 +1388,7 @@ instance Show ApiKey where

supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 2 2
[ ApiVersionV0 (ApiKey 0) 0 2
, ApiVersionV0 (ApiKey 1) 0 2
, ApiVersionV0 (ApiKey 2) 0 1
, ApiVersionV0 (ApiKey 3) 0 4
Expand All @@ -1345,6 +1409,8 @@ supportedApiVersions =
]

getHeaderVersion :: ApiKey -> Int16 -> (Int16, Int16)
getHeaderVersion (ApiKey 0) 0 = (1, 0)
getHeaderVersion (ApiKey 0) 1 = (1, 0)
getHeaderVersion (ApiKey 0) 2 = (1, 0)
getHeaderVersion (ApiKey 1) 0 = (1, 0)
getHeaderVersion (ApiKey 1) 1 = (1, 0)
Expand Down
Loading

0 comments on commit a617008

Please sign in to comment.