Skip to content

Commit

Permalink
Implement nullvalue
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Feb 8, 2025
1 parent 2acf7d4 commit 5982fa6
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/Pulsar.Client/Common/DTO.fs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ type internal Metadata =
EncryptionAlgo: string
OrderingKey: byte[]
ReplicatedFrom: string
NullValue: bool
}

type MessageKey =
Expand Down
1 change: 1 addition & 0 deletions src/Pulsar.Client/Internal/BatchMessageContainer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ module internal BatchHelpers =
if message.Properties.Count > 0 then
for property in message.Properties do
smm.Properties.Add(KeyValue(Key = property.Key, Value = property.Value))
smm.NullValue <- box message.Value |> isNull
Serializer.SerializeWithLengthPrefix(messageStream, smm, PrefixStyle.Fixed32BigEndian)
messageWriter.Write(message.Payload)
struct(BatchDetails(%index, BatchMessageAcker.NullAcker), message, batchItem.Tcs)
Expand Down
1 change: 1 addition & 0 deletions src/Pulsar.Client/Internal/ClientCnx.fs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ and internal ClientCnx (config: PulsarClientConfiguration,
EncryptionAlgo = messageMetadata.EncryptionAlgo
OrderingKey = messageMetadata.OrderingKey
ReplicatedFrom = messageMetadata.ReplicatedFrom
NullValue = messageMetadata.NullValue
}

{
Expand Down
18 changes: 12 additions & 6 deletions src/Pulsar.Client/Internal/ConsumerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -668,9 +668,12 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
else
let msgKey = rawMessage.MessageKey
let getValue () =
keyValueProcessor
|> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, payload) :?> 'T)
|> Option.defaultWith (fun () -> schemaDecodeFunction payload)
if rawMessage.Metadata.NullValue then
Unchecked.defaultof<'T>
else
keyValueProcessor
|> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, payload) :?> 'T)
|> Option.defaultWith (fun () -> schemaDecodeFunction payload)
let message = Message(
msgId,
payload,
Expand Down Expand Up @@ -1330,9 +1333,12 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
}
let msgKey = singleMessageMetadata.PartitionKey
let getValue () =
keyValueProcessor
|> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, singleMessagePayload) :?> 'T)
|> Option.defaultWith (fun() -> schemaDecodeFunction singleMessagePayload)
if singleMessageMetadata.NullValue then
Unchecked.defaultof<'T>
else
keyValueProcessor
|> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, singleMessagePayload) :?> 'T)
|> Option.defaultWith (fun () -> schemaDecodeFunction singleMessagePayload)
let properties =
if singleMessageMetadata.Properties.Count > 0 then
singleMessageMetadata.Properties
Expand Down
3 changes: 2 additions & 1 deletion src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,8 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
Some { PartitionKey = %key; IsBase64Encoded = false }
else
Some { PartitionKey = %Convert.ToBase64String(keyBytes); IsBase64Encoded = true }
MessageBuilder(value, schema.Encode(value), keyObj,
let payloay = if box value |> isNull then Array.empty<byte> else schema.Encode(value)
MessageBuilder(value, payloay, keyObj,
?properties0 = (properties |> Option.ofObj),
?deliverAt = (deliverAt |> Option.ofNullable),
?sequenceId = (sequenceId |> Option.ofNullable),
Expand Down
41 changes: 41 additions & 0 deletions tests/IntegrationTests/Batching.fs
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,45 @@ let tests =
Log.Debug("Finished 'Second batch is formed well after the first one'")

}

testTask "Null message with batch get sent if batch size exceeds" {

Log.Debug("Started 'Null message with batch get sent if batch size exceeds'")

let client = getClient()
let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N")
let messagesNumber = 5

let! (consumer: IConsumer<byte[]>) =
client.NewConsumer()
.Topic(topicName)
.ConsumerName("batch consumer")
.SubscriptionName("batch-subscription")
.SubscribeAsync()

let! (producer: IProducer<byte[]>) =
client.NewProducer()
.Topic(topicName)
.ProducerName("batch producer")
.EnableBatching(true)
.BatchingMaxMessages(messagesNumber / 2)
.BatchingMaxBytes(100)
.MaxPendingMessages(1)
.BlockIfQueueFull(true)
.CreateAsync()

for i in 0 .. messagesNumber-1 do
producer.SendAsync(producer.NewMessage(null)) |> ignore

for i in 0 .. messagesNumber-1 do
let! (message: Message<byte[]>) = consumer.ReceiveAsync()
match message.MessageId.Type with
| Batch (index, _) ->
Expect.equal $"Run {i} failed" (i % 2) %index
| _ ->
failwith "Expected batch message"

Log.Debug("Finished 'Null message with batch get sent if batch size exceeds'")

}
]

0 comments on commit 5982fa6

Please sign in to comment.