Skip to content

Commit

Permalink
merge latest publishing fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
romanett committed Oct 13, 2024
2 parents ddf6ed1 + 409a51b commit 5c22a0c
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,7 @@ public virtual bool Publish(OperationContext context, Queue<EventFieldList> noti
// go to the next sampling interval.
IncrementSampleTime();

bool moreValuesToPublish = false;
// publish events.
if (m_eventQueueHandler != null)
{
Expand Down Expand Up @@ -1131,21 +1132,29 @@ public virtual bool Publish(OperationContext context, Queue<EventFieldList> noti
if (overflowEvent != null && m_discardOldest)
{
notifications.Enqueue(overflowEvent);
maxNotificationsPerPublish--;
}
uint overflowEventCount = overflowEvent == null ? (uint)0 : 1;
m_eventQueueHandler.Publish(context, notifications, maxNotificationsPerPublish - overflowEventCount);
uint notificationCount = m_eventQueueHandler.Publish(context, notifications, maxNotificationsPerPublish);

moreValuesToPublish = m_eventQueueHandler?.ItemsInQueue > 0;

// place overflow event at the end of the queue if queue is empty.
if (overflowEvent != null && !m_discardOldest && m_eventQueueHandler?.ItemsInQueue == 0)
if (overflowEvent != null && !m_discardOldest)
{
if (notificationCount < maxNotificationsPerPublish)
{
notifications.Enqueue(overflowEvent);
}
else
{
moreValuesToPublish = true;
}
notifications.Enqueue(overflowEvent);
}

Utils.LogTrace(Utils.TraceMasks.OperationDetail, "MONITORED ITEM: Publish(QueueSize={0})", notifications.Count);
}

bool moreValuesToPublish = m_eventQueueHandler?.ItemsInQueue > 0;

// reset state variables.
m_readyToPublish = moreValuesToPublish;
m_readyToTrigger = moreValuesToPublish;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public interface IEventQueueHandler : IDisposable
/// <param name="context"></param>
/// <param name="notifications"></param>
/// <param name="maxNotificationsPerPublish">the maximum number of notifications to enqueue per call</param>
void Publish(OperationContext context, Queue<EventFieldList> notifications, uint maxNotificationsPerPublish);
/// <returns>the number of events that were added to the notification queue</returns>
uint Publish(OperationContext context, Queue<EventFieldList> notifications, uint maxNotificationsPerPublish);
}
/// <summary>
/// Mangages an event queue for usage by a MonitoredItem
Expand Down Expand Up @@ -175,7 +176,7 @@ public virtual void QueueEvent(EventFieldList fields)
/// <param name="context"></param>
/// <param name="notifications"></param>
/// <param name="maxNotificationsPerPublish">the maximum number of notifications to enqueue per call</param>
public void Publish(OperationContext context, Queue<EventFieldList> notifications, uint maxNotificationsPerPublish)
public uint Publish(OperationContext context, Queue<EventFieldList> notifications, uint maxNotificationsPerPublish)
{
uint notificationCount = 0;
while (notificationCount < maxNotificationsPerPublish && m_eventQueue.Dequeue(out EventFieldList fields))
Expand All @@ -193,6 +194,8 @@ public void Publish(OperationContext context, Queue<EventFieldList> notification
}
//if overflow event is placed at the end of the queue only set overflow to false once the queue is empty
m_overflow = m_overflow && m_eventQueue.ItemsInQueue > 0 && !m_discardOldest;

return notificationCount;
}

private bool m_overflow;
Expand Down
79 changes: 75 additions & 4 deletions Tests/Opc.Ua.Server.Tests/MonitoredItemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,74 @@ public void CreateEventMIOverflow()
Assert.That(publishResult.Handle, Is.AssignableTo(typeof(EventQueueOverflowEventState)));
}

[Test]
public void CreateEventMIOverflowMultiplePublish()
{
MonitoredItem monitoredItem = CreateMonitoredItem(true, 2);
Assert.That(monitoredItem, Is.Not.Null);
Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(0));

monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null));
monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null));

Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(2));


monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null));

Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(2));


var result = new Queue<EventFieldList>();
bool moreItems = monitoredItem.Publish(new OperationContext(monitoredItem), result, 2);

Assert.That(moreItems, Is.True);
Assert.That(result, Is.Not.Empty);
Assert.That(result.Count, Is.EqualTo(2));
EventFieldList publishResult = result.LastOrDefault();
Assert.That(publishResult, Is.Not.Null);
Assert.That(publishResult.Handle, Is.AssignableTo(typeof(AuditUrlMismatchEventState)));


var result2 = new Queue<EventFieldList>();
bool moreItems2 = monitoredItem.Publish(new OperationContext(monitoredItem), result2, 2);

Assert.That(moreItems2, Is.False);
Assert.That(result2, Is.Not.Empty);
Assert.That(result2.Count, Is.EqualTo(1));
EventFieldList publishResult2 = result2.FirstOrDefault();
Assert.That(publishResult2, Is.Not.Null);
Assert.That(publishResult2.Handle, Is.AssignableTo(typeof(EventQueueOverflowEventState)));
}

[Test]
public void CreateEventMIOverflowNoDiscard()
{
MonitoredItem monitoredItem = CreateMonitoredItem(true, 2, true);
Assert.That(monitoredItem, Is.Not.Null);
Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(0));

monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null));
monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null));

Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(2));


monitoredItem.QueueEvent(new AuditUrlMismatchEventState(null));

Assert.That(monitoredItem.ItemsInQueue, Is.EqualTo(2));


var result = new Queue<EventFieldList>();
monitoredItem.Publish(new OperationContext(monitoredItem), result, 3);

Assert.That(result, Is.Not.Empty);
Assert.That(result.Count, Is.EqualTo(3));
EventFieldList publishResult = result.FirstOrDefault();
Assert.That(publishResult, Is.Not.Null);
Assert.That(publishResult.Handle, Is.AssignableTo(typeof(EventQueueOverflowEventState)));
}


[Test]
public void CreateEventMIPublishPartial()
Expand All @@ -139,17 +207,20 @@ public void CreateEventMIPublishPartial()


var result = new Queue<EventFieldList>();
monitoredItem.Publish(new OperationContext(monitoredItem), result, 2);
bool moreItems = monitoredItem.Publish(new OperationContext(monitoredItem), result, 2);


Assert.That(moreItems, Is.True);
Assert.That(result, Is.Not.Empty);
Assert.That(result.Count, Is.EqualTo(2));
EventFieldList publishResult = result.LastOrDefault();
Assert.That(publishResult, Is.Not.Null);
Assert.That(publishResult.Handle, Is.AssignableTo(typeof(AuditUrlMismatchEventState)));

var result2 = new Queue<EventFieldList>();
monitoredItem.Publish(new OperationContext(monitoredItem), result2, 2);
bool moreItems2 = monitoredItem.Publish(new OperationContext(monitoredItem), result2, 2);

Assert.That(moreItems2, Is.False);
Assert.That(result2, Is.Not.Empty);
Assert.That(result2.Count, Is.EqualTo(1));
EventFieldList publishResult2 = result2.LastOrDefault();
Expand All @@ -159,7 +230,7 @@ public void CreateEventMIPublishPartial()
#endregion

#region private methods
private MonitoredItem CreateMonitoredItem(bool events = false, uint queueSize = 10)
private MonitoredItem CreateMonitoredItem(bool events = false, uint queueSize = 10, bool discardOldest = false)
{
MonitoringFilter filter = events ? new EventFilter() : new MonitoringFilter();

Expand All @@ -186,7 +257,7 @@ private MonitoredItem CreateMonitoredItem(bool events = false, uint queueSize =
null,
1000.0,
queueSize,
false,
discardOldest,
1000
);
}
Expand Down

0 comments on commit 5c22a0c

Please sign in to comment.