Replies: 7 comments 12 replies
-
Can you provide more info? Like a snippet code. If you are using Tasks .. etc Just reporting the stack trace does not help too much |
Beta Was this translation helpful? Give feedback.
-
Hi, no problem. May need a little bit of refactoring, but this is how I publish messages. Not sure whether this helps you in any way. My publish method will also create the Producer in case it does not exist. This is done synchronous. public Task PublishMessage(List<IDataAgentEntity> messages, Type type)
{
return Task.Run(new Action(async () =>
{
try
{
//--------------------------------------------------------------------
// Check if there are any messages to send at all
//--------------------------------------------------------------------
if (messages == null || messages.Count == 0)
{
Logger.Log(LogLevel.Warning, $"No message to send found when publishing stream messages");
return;
}
//--------------------------------------------------------------------
// Check if stream is open at all
//--------------------------------------------------------------------
if (!OpenOrVerifyConnection())
{
Logger.Log(LogLevel.Error, $"Stream is not open during push of type <{type.Name}>");
return;
}
//--------------------------------------------------------------------
// Try to get attribute from type
//--------------------------------------------------------------------
StreamAttribute? streamAttribute = (StreamAttribute?)Attribute.GetCustomAttribute(type, typeof(StreamAttribute));
//--------------------------------------------------------------------
// Check if attribute is null
//--------------------------------------------------------------------
if (streamAttribute == null)
{
Logger.Log(LogLevel.Error, $"Type <{type.Name}> has no StreamAttribute");
return;
}
//--------------------------------------------------------------------
// Check if given stream exists
//--------------------------------------------------------------------
if (Streams.Contains(streamAttribute.StreamName) == false)
{
if (Stream.StreamExists(streamAttribute.StreamName).Result == false)
{
Logger.Log(LogLevel.Warning, $"Stream with name <{streamAttribute.StreamName}> does not exist. Create it");
Stream.CreateStream(new StreamSpec(streamAttribute.StreamName)
{
MaxLengthBytes = StreamLengthBytes //10 GBytes
}).Wait();
Streams.Add(streamAttribute.StreamName);
}
}
//--------------------------------------------------------------------
// Check if we got a producer for that stream already
//--------------------------------------------------------------------
if (!Producers.ContainsKey(streamAttribute.StreamName))
{
Logger.Log(LogLevel.Information, $"No producer created for stream <{streamAttribute.StreamName}>");
IProducer p = Stream.CreateRawProducer(
new RawProducerConfig(streamAttribute.StreamName)
{
Reference = Guid.NewGuid().ToString(),
ConfirmHandler = ConfirmMessage,
BatchSize = 1
}, Logger).Result;
Producers.Add(streamAttribute.StreamName, p);
Logger.Log(LogLevel.Information, $"Producer created and added for stream <{streamAttribute.StreamName}>");
}
IProducer producer = Producers[streamAttribute.StreamName];
List<Message> messagesToSend = new List<Message>();
foreach (IDataAgentEntity message in messages)
{
messagesToSend.Add(new Message(Encoding.UTF8.GetBytes(EntityConverter.Serialize(message))));
}
Logger.Log(LogLevel.Debug, $"Start publishing message for producer <{streamAttribute.StreamName}>. <{messagesToSend.Count}> message to send. PublishId <{PublishId}>");
await producer.Send(PublishId, messagesToSend, CompressionType.Gzip);
Logger.Log(LogLevel.Debug, $"Finished publishing message for producer <{streamAttribute.StreamName}>");
PublishId++;
}
catch (AggregateException agex)
{
foreach (var innerException in agex.InnerExceptions)
{
Logger.Log(LogLevel.Error, innerException.ToString());
}
}
catch (Exception ex)
{
Logger.Log(LogLevel.Error, ex.ToString());
}
}));
} |
Beta Was this translation helpful? Give feedback.
-
Thank you, ok you are using the sub-entry batch. That helps to understand where to debug. |
Beta Was this translation helpful? Give feedback.
-
You are using the I suspect that your problem is related to thread status. The code you should use: var producer = await Producer.Create(new ProducerConfig(system, Stream), producerLogger);
Console.WriteLine("Start sending messages ...");
for (ulong zIndex = 0; zIndex < 500000; zIndex++)
{
var messagesToSend = new List<Message>();
for (ulong i = 0; i < <<XXX_SUBENTRY_VALUE>>; i++)
{
messagesToSend.Add(new Message(new byte[50]));
}
await producer.Send(messagesToSend, CompressionType.Gzip).ConfigureAwait(false);
} Note:
The code looks a bit complex, I am trying to reproduce the issue without success. |
Beta Was this translation helpful? Give feedback.
-
We've seen this type of exception with the same consequences aswell and I agree with @Gsantomaggio. I'd guess it's a threading problem caused by the usage of the lib, and not a problem with the lib itself. There's alot of deadlock prone code in the supplied example (sync over async with blocking |
Beta Was this translation helpful? Give feedback.
-
Will close due of inactivity |
Beta Was this translation helpful? Give feedback.
-
Hi @Gsantomaggio, I am still struggling with the above What I additionally see is that I get a Any recommendation for me? Kind regards |
Beta Was this translation helpful? Give feedback.
-
Describe the bug
While calling the IProducer.Send() method I spradically encounter a System.AggregateException which terminated my whole application. I am even unable to catch this exception with a try-catch clause.
Maybe this is soemthing for you to look at. I am using version 1.3 (latest) of the Stream library.
CoreCLR Version: 6.0.1523.11507 .NET Version: 6.0.15 Description: The process was terminated due to an unhandled exception. Exception Info: System.AggregateException: One or more errors occurred. (Operation is not valid due to the current state of the object.) ---> System.InvalidOperationException: Operation is not valid due to the current state of the object. at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore1.SignalCompletion() at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore1.SetException(Exception error) at RabbitMQ.Stream.Client.ManualResetValueTaskSource1.SetException(Exception error) in /_/RabbitMQ.Stream.Client/Client.cs:line 705 at RabbitMQ.Stream.Client.Client.<>c__542.<Request>b__54_0(Object valueTaskSource) in /_/RabbitMQ.Stream.Client/Client.cs:line 354 at System.Threading.CancellationTokenSource.Invoke(Delegate d, Object state, CancellationTokenSource source) at System.Threading.CancellationTokenSource.CallbackNode.<>c.<ExecuteCallback>b__9_0(Object s) at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) --- End of stack trace from previous location --- at System.Threading.CancellationTokenSource.CallbackNode.ExecuteCallback() at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException) --- End of inner exception stack trace --- at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException) at System.Threading.CancellationTokenSource.NotifyCancellation(Boolean throwOnFirstException) at System.Threading.CancellationTokenSource.TimerCallback(Object state) at System.Threading.TimerQueueTimer.Fire(Boolean isThreadPool) at System.Threading.TimerQueue.FireNextTimers()
Reproduction steps
For me it happens every couple of hours during continuous message sending
Expected behavior
Ideally either the Send or Confim method of the Producer would return a negative publish result
Additional context
No response
Beta Was this translation helpful? Give feedback.
All reactions