Replies: 5 comments 11 replies
-
Shout if this is better suited for |
Beta Was this translation helpful? Give feedback.
-
Hi @ricsiLT, |
Beta Was this translation helpful? Give feedback.
-
Broker version would be helpful. |
Beta Was this translation helpful? Give feedback.
-
Also, |
Beta Was this translation helpful? Give feedback.
-
RabbitMQ stream client: 1.6.0 Sample code, consumer: _logger.LogInformation("Starting test");
var myReference = "reference-for-bugreport";
var streamName = "syst_mip_segments";
var channel = Channel.CreateUnbounded<ulong>();
var ip = "";
var username = "";
var password = "";
var vhost = "";
ushort initialCredits = 10;
var addressResolver = new AddressResolver(new IPEndPoint(IPAddress.Parse(ip), 5551));
var streamSystem = await StreamSystem.Create(
new StreamSystemConfig
{
VirtualHost = vhost,
ClientProvidedName = "testing-stream-system",
UserName = username,
Password = password,
AddressResolver = addressResolver,
Endpoints = new List<EndPoint> { addressResolver.EndPoint },
Ssl = new SslOption
{
Enabled = true,
CertificateValidationCallback = (_, _, _, _) => true
}
}
);
IOffsetType offset;
try
{
offset = new OffsetTypeOffset(await streamSystem.QueryOffset(myReference, streamName));
}
catch (Exception ex)
{
offset = new OffsetTypeFirst();
}
var rawConsumerConfig = new RawConsumerConfig(streamName)
{
ClientProvidedName = "raw-consumer-ricsi",
Reference = myReference,
OffsetSpec = offset,
MessageHandler = async (_, messageContext, message) =>
{
// _logger.LogInformation("Message size: {MessageSize}", message.Data.Contents.Length);
await channel.Writer.WriteAsync(messageContext.Offset, stoppingToken).ConfigureAwait(false);
},
InitialCredits = initialCredits
};
var consumer = await streamSystem.CreateRawConsumer(rawConsumerConfig);
var count = 0;
_logger.LogInformation("Starting consumption");
while (!stoppingToken.IsCancellationRequested)
{
var retrievedOffset = await channel.Reader.ReadAsync(stoppingToken);
count++;
if (count == 1000)
{
// await consumer.StoreOffset(retrievedOffset);
_logger.LogInformation("Stored {RetrievedOffset}", retrievedOffset);
count = 0;
}
}
consumer.Dispose();
await streamSystem.Close();
_logger.LogInformation("Done"); |
Beta Was this translation helpful? Give feedback.
-
Describe the bug
Our situation, generalized:
We have multiple publishers.
Due to nature of signals they produce, they can be considered "sparse" - publishing is sparse both in time (lets say, once in 10s) and sparse in quantity (a few messages at a time).
We noticed that publishing in such way is suboptimal - it takes a long time to consume such stream due to having a lot of small chunks.
After experimenting, we saw that if we read the same stream into memory and republish it in batches of 1k messages, we can consume it in seconds instead of minutes (this is for about a few million messages).
I can give you code examples if you want, but essentially this is a "complaint" about slowness, in magnitudes, of
.Publish(message)
vs.Publish(messages)
.We wanted to ask if there are any recommendations for such use case (any settings we could use, etc.)? Currently we try to batch messages on producing side but it can only be batched so much there due to system requirements (currently we only batch if messages are close together, timewise).
Reproduction steps
Expected behavior
Reading should be fast enough to saturate 100mbit link.
Additional context
No response
Beta Was this translation helpful? Give feedback.
All reactions