Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Server] Durable Subscriptions #2683

Merged
merged 66 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
2bf1dce
Durable sub proto (#8)
romanett Jul 25, 2024
865255f
enable in reference server
romanett Jul 25, 2024
8275e8e
limit max dequeued values per publish per monitoredItem to maxNotific…
romanett Jul 25, 2024
77403da
Merge branch 'DurableSubscription' into DurableSubscriptionTest
romanett Jul 25, 2024
beab2c8
remove old MonitoredItemQueue
romanett Jul 27, 2024
251ab95
implement application configuration builder
romanett Jul 27, 2024
2b5be5d
Add Tests and benchmarks. Fix DataValueQueue ItemsInQueue count
romanett Jul 28, 2024
02a666b
Merge branch 'DurableSubscription' into DurableSubscriptionTest
romanett Jul 28, 2024
a53b152
add Tests for queue size changes
romanett Jul 30, 2024
9a83057
enable durable subscriptons in reference server
romanett Jul 30, 2024
59d4ae9
add Queue Handler Interface, sketch out queue handler tests
romanett Aug 1, 2024
bcc6dc1
Merge remote-tracking branch 'origin/master' into DurableSubscription…
romanett Aug 2, 2024
3a4eaf0
add tests for event queue
romanett Aug 2, 2024
41bcccb
add tests for data value queue
romanett Aug 2, 2024
51ff979
Merge branch 'OPCFoundation:master' into DurableSubscriptionTest
romanett Aug 7, 2024
ead9349
Fix bug in MonitoredItem QueueEvent
romanett Aug 7, 2024
4b16f7c
adress review feedback (typos, comments, small fixes)
romanett Aug 17, 2024
f26b701
Merge branch 'OPCFoundation:master' into DurableSubscriptionTest
romanett Aug 17, 2024
f62d2e5
- fix Queue Size Changes of DataValueQueue
romanett Aug 21, 2024
4faaa3b
clean up claculations
romanett Aug 21, 2024
7c05382
fix tests
romanett Aug 22, 2024
239f2d3
add more tests
romanett Aug 22, 2024
a22cb62
add more tests
romanett Aug 22, 2024
3ce2bd8
fix ctt issues
romanett Aug 24, 2024
7a4822e
fix publishing
romanett Aug 24, 2024
13580da
rename files, change status code
romanett Aug 24, 2024
dadb38d
try to fix git file move detection
romanett Aug 24, 2024
02958c3
Merge branch 'OPCFoundation:master' into DurableSubscriptionTest
romanett Aug 24, 2024
b566cc8
move interfaces to classes
romanett Aug 27, 2024
95ed2be
merge origin/master
romanett Sep 6, 2024
241c014
improve publish operation to avoid discards with large MI Queue sizes
romanett Sep 6, 2024
057e3dc
improve publishing operation to avoid discarding values when a large …
romanett Sep 15, 2024
ff23131
implement changes in custom DataChangeMonitoredItem
romanett Sep 15, 2024
079a46e
Merge remote-tracking branch 'origin/master' into DurableSubscription…
romanett Sep 15, 2024
b0470c8
fix publishing operation
romanett Sep 15, 2024
69dc860
improve readability
romanett Sep 15, 2024
5378bc6
fix null reference exception
romanett Sep 15, 2024
de6a8cf
fix revisedLifeTimeCalcualations
romanett Sep 16, 2024
90b82a1
apply changed configuration in Reference Server
romanett Sep 16, 2024
6acc761
Investigation Only!
Archie-Miller Sep 17, 2024
82725ee
only place overflow event at the end of the event queue
romanett Sep 19, 2024
a61c6dd
merge lastest publishing improvements
romanett Sep 21, 2024
3eedc57
fix typo
romanett Sep 21, 2024
c803ef4
fix test
romanett Sep 21, 2024
1f94f7d
Merge branch 'DurableSubscriptionTest' into Add_System_Tests
Archie-Miller Sep 23, 2024
6d9d408
Revert Test
Archie-Miller Sep 23, 2024
352eeb8
Initial SetDurableSubscription Tests
Archie-Miller Sep 24, 2024
c525651
Update comment as to Keep Alive * 3 operations
Archie-Miller Oct 3, 2024
15aa944
address review feedback
romanett Oct 8, 2024
3bf04ab
use archies code
romanett Oct 8, 2024
fad3b9d
fix notification count increment
romanett Oct 8, 2024
f65b46b
Merge branch 'master' into LargeQueuePublishing
mregen Oct 9, 2024
2ffc444
fix publishing of Events
romanett Oct 9, 2024
5c88ca3
Merge branch 'LargeQueuePublishing' of https://github.com/romanett/UA…
romanett Oct 9, 2024
c982aa1
fix event publishing add tests
romanett Oct 9, 2024
9811dea
Merge remote-tracking branch 'origin/master' into DurableSubscription…
romanett Oct 10, 2024
639af8b
adress review feedback
romanett Oct 10, 2024
a1ba0cc
merge updates
romanett Oct 10, 2024
409a51b
fix edge case
romanett Oct 10, 2024
a1ddadf
fix build
romanett Oct 10, 2024
ddf6ed1
fix tests
romanett Oct 10, 2024
5c22a0c
merge latest publishing fixes
romanett Oct 13, 2024
f4af3f4
fixes
romanett Oct 13, 2024
77d089b
add Integration Tests
romanett Oct 13, 2024
3f9c135
increase version to 1.5.376
romanett Oct 15, 2024
3b7d4a2
Merge branch 'develop/durable' into DurableSubscriptionTest
mregen Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@
<MaxPublishRequestCount>20</MaxPublishRequestCount>
<MaxSubscriptionCount>100</MaxSubscriptionCount>
<MaxEventQueueSize>10000</MaxEventQueueSize>
<DurableSubscriptionsEnabled>true</DurableSubscriptionsEnabled>
<MaxDurableNotificationQueueSize>10000</MaxDurableNotificationQueueSize>
<MaxDurableEventQueueSize>10000</MaxDurableEventQueueSize>
<MaxDurableSubscriptionLifetimeInHours>10</MaxDurableSubscriptionLifetimeInHours>

<!-- see https://opcfoundation-onlineapplications.org/profilereporting/ for list of available profiles -->
<ServerProfileArray>
Expand Down
ThomasNehring marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/* ========================================================================
* Copyright (c) 2005-2024 The OPC Foundation, Inc. All rights reserved.
*
* OPC Foundation MIT License 1.00
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*
* The complete license agreement can be found here:
* http://opcfoundation.org/License/MIT/1.00/
* ======================================================================*/


using Opc.Ua.Server;

namespace Quickstarts.Servers
{
/// <summary>
/// A factory for <see cref="IDataChangeMonitoredItemQueue"> and </see> <see cref="IEventMonitoredItemQueue"/>
/// </summary>
public class DurableMonitoredItemQueueFactory : IMonitoredItemQueueFactory
{
/// <inheritdoc/>
public bool SupportsDurableQueues => true;
/// <inheritdoc/>
public IDataChangeMonitoredItemQueue CreateDataChangeQueue(bool createDurable)
{
//use durable queue only if MI is durable
if (createDurable)
{

return new DurableDataChangeMonitoredItemQueue(createDurable);
}
else
{
return new DataChangeMonitoredItemQueue(createDurable);
}

}

/// <inheritdoc/>
public IEventMonitoredItemQueue CreateEventQueue(bool createDurable)
{
//use durable queue only if MI is durable
if (createDurable)
{

return new DurableEventMonitoredItemQueue(createDurable);
}
else
{
return new EventMonitoredItemQueue(createDurable);
}
}

/// <inheritdoc/>
public void Dispose()
{
//only needed for managed resources
}
}

public class DurableEventMonitoredItemQueue : EventMonitoredItemQueue
{
/// <summary>
/// Creates an empty queue.
/// </summary>
public DurableEventMonitoredItemQueue(bool createDurable) : base(false)
{
IsDurable = createDurable;
}

#region Public Methods
/// <inheritdoc/>
public override bool IsDurable { get; }
#endregion
}

public class DurableDataChangeMonitoredItemQueue : DataChangeMonitoredItemQueue
{
/// <summary>
/// Creates an empty queue.
/// </summary>
public DurableDataChangeMonitoredItemQueue(bool createDurable) : base(false)
{
IsDurable = createDurable;
}

#region Public Methods
/// <inheritdoc/>
public override bool IsDurable { get; }
#endregion
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public MemoryBufferMonitoredItem(
double samplingInterval,
uint queueSize,
bool discardOldest,
double minimumSamplingInterval)
double minimumSamplingInterval,
bool createDurable)
:
base(
server,
Expand All @@ -80,7 +81,8 @@ public MemoryBufferMonitoredItem(
samplingInterval,
queueSize,
discardOldest,
minimumSamplingInterval)
minimumSamplingInterval,
createDurable)
{
m_offset = offset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ protected override ServiceResult CreateMonitoredItem(
DiagnosticsMasks diagnosticsMasks,
TimestampsToReturn timestampsToReturn,
MonitoredItemCreateRequest itemToCreate,
bool createDurable,
ref long globalIdCounter,
out MonitoringFilterResult filterError,
out IMonitoredItem monitoredItem)
Expand All @@ -288,6 +289,7 @@ protected override ServiceResult CreateMonitoredItem(
diagnosticsMasks,
timestampsToReturn,
itemToCreate,
createDurable,
ref globalIdCounter,
out filterError,
out monitoredItem);
Expand Down Expand Up @@ -366,7 +368,8 @@ protected override ServiceResult CreateMonitoredItem(
timestampsToReturn,
itemToCreate.MonitoringMode,
itemToCreate.RequestedParameters.ClientHandle,
samplingInterval);
samplingInterval,
createDurable);

// report the initial value.
datachangeItem.QueueValue(initialValue, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,8 @@ public MemoryBufferMonitoredItem CreateDataChangeItem(
TimestampsToReturn timestampsToReturn,
MonitoringMode monitoringMode,
uint clientHandle,
double samplingInterval)
double samplingInterval,
bool createDurable)

/*
ISystemContext context,
Expand Down Expand Up @@ -479,7 +480,8 @@ public MemoryBufferMonitoredItem CreateDataChangeItem(
samplingInterval,
0,
false,
0);
0,
createDurable);

/*
MemoryBufferMonitoredItem monitoredItem = new MemoryBufferMonitoredItem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ protected override MasterNodeManager CreateMasterNodeManager(IServerInternal ser
return new MasterNodeManager(server, configuration, null, nodeManagers.ToArray());
}

protected override IMonitoredItemQueueFactory CreateMonitoredItemQueueFactory(IServerInternal server, ApplicationConfiguration configuration)
{
if (configuration?.ServerConfiguration?.DurableSubscriptionsEnabled == true)
{
return new Servers.DurableMonitoredItemQueueFactory();
}
return new MonitoredItemQueueFactory();
}

/// <summary>
/// Loads the non-configurable properties for the application.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public DataChangeMonitoredItem(
/// Constructs a new instance.
/// </summary>
public DataChangeMonitoredItem(
IMonitoredItemQueueFactory monitoredItemQueueFactory,
MonitoredNode source,
uint id,
uint attributeId,
Expand All @@ -94,6 +95,7 @@ public DataChangeMonitoredItem(
bool alwaysReportUpdates)
{
m_source = source;
m_monitoredItemQueueFactory = monitoredItemQueueFactory;
m_id = id;
m_attributeId = attributeId;
m_indexRange = indexRange;
Expand All @@ -108,6 +110,7 @@ public DataChangeMonitoredItem(
m_readyToTrigger = false;
m_resendData = false;
m_queue = null;
m_queueSize = queueSize;
m_filter = filter;
m_range = 0;
m_alwaysReportUpdates = alwaysReportUpdates;
Expand All @@ -119,7 +122,7 @@ public DataChangeMonitoredItem(

if (queueSize > 1)
{
m_queue = new MonitoredItemQueue(id);
m_queue = new DataChangeQueueHandler(id, false, m_monitoredItemQueueFactory);
m_queue.SetQueueSize(queueSize, discardOldest, diagnosticsMasks);
m_queue.SetSamplingInterval(samplingInterval);
}
Expand Down Expand Up @@ -241,6 +244,7 @@ public ServiceResult Modify(
m_diagnosticsMasks = diagnosticsMasks;
m_timestampsToReturn = timestampsToReturn;
m_clientHandle = clientHandle;
m_queueSize = queueSize;

// subtract the previous sampling interval.
long oldSamplingInterval = (long)(m_samplingInterval * TimeSpan.TicksPerMillisecond);
Expand Down Expand Up @@ -278,7 +282,7 @@ public ServiceResult Modify(
{
if (m_queue == null)
{
m_queue = new MonitoredItemQueue(m_id);
m_queue = new DataChangeQueueHandler(m_id, false, m_monitoredItemQueueFactory);
}

m_queue.SetQueueSize(queueSize, discardOldest, diagnosticsMasks);
Expand Down Expand Up @@ -497,7 +501,7 @@ public ServiceResult GetCreateResult(out MonitoredItemCreateResult result)

if (m_queue != null)
{
result.RevisedQueueSize = m_queue.QueueSize;
result.RevisedQueueSize = m_queueSize;
}

return ServiceResult.Good;
Expand All @@ -520,7 +524,7 @@ public ServiceResult GetModifyResult(out MonitoredItemModifyResult result)

if (m_queue != null)
{
result.RevisedQueueSize = m_queue.QueueSize;
result.RevisedQueueSize = m_queueSize;
}

return ServiceResult.Good;
Expand Down Expand Up @@ -668,6 +672,8 @@ public DataChangeFilter DataChangeFilter
get { return m_filter; }
}

public bool IsDurable => false;

/// <summary>
/// Increments the sample time to the next interval.
/// </summary>
Expand Down Expand Up @@ -697,7 +703,7 @@ private void IncrementSampleTime()
/// <summary>
/// Called by the subscription to publish any notification.
/// </summary>
public bool Publish(OperationContext context, Queue<MonitoredItemNotification> notifications, Queue<DiagnosticInfo> diagnostics)
public bool Publish(OperationContext context, Queue<MonitoredItemNotification> notifications, Queue<DiagnosticInfo> diagnostics, uint maxNotificationsPerPublish)
{
lock (m_lock)
{
Expand All @@ -715,23 +721,21 @@ public bool Publish(OperationContext context, Queue<MonitoredItemNotification> n
IncrementSampleTime();
}

// update publish flag.
m_readyToPublish = false;
m_readyToTrigger = false;

// check if queuing is enabled.
if (m_queue != null && (!m_resendData || m_queue.ItemsInQueue != 0))
{
DataValue value = null;
ServiceResult error = null;

while (m_queue.Publish(out value, out error))
uint notificationCount = 0;
while (notificationCount < maxNotificationsPerPublish && m_queue.PublishSingleValue(out value, out error))
{
Publish(context, value, error, notifications, diagnostics);
notificationCount++;

if (m_resendData)
{
m_readyToPublish = m_queue.ItemsInQueue > 0;
break;
}
}
Expand All @@ -741,10 +745,14 @@ public bool Publish(OperationContext context, Queue<MonitoredItemNotification> n
Publish(context, m_lastValue, m_lastError, notifications, diagnostics);
}

bool moreValuesToPublish = m_queue?.ItemsInQueue > 0;

// update flags
m_readyToPublish = moreValuesToPublish;
m_readyToTrigger = moreValuesToPublish;
m_resendData = false;

return true;
return moreValuesToPublish;
}
}

Expand Down Expand Up @@ -834,10 +842,16 @@ private void Publish(

diagnostics.Enqueue(diagnosticInfo);
}

public void Dispose()
{
//only durable queues need to be disposed
}
#endregion

#region Private Fields
private readonly object m_lock = new object();
private IMonitoredItemQueueFactory m_monitoredItemQueueFactory;
private MonitoredNode m_source;
private ISubscription m_subscription;
private uint m_id;
Expand All @@ -850,7 +864,8 @@ private void Publish(
private DiagnosticsMasks m_diagnosticsMasks;
private uint m_clientHandle;
private double m_samplingInterval;
private MonitoredItemQueue m_queue;
private DataChangeQueueHandler m_queue;
private uint m_queueSize;
private DataChangeFilter m_filter;
private double m_range;
private MonitoringMode m_monitoringMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public DataChangeMonitoredItem CreateDataChangeItem(
bool alwaysReportUpdates)
{
DataChangeMonitoredItem monitoredItem = new DataChangeMonitoredItem(
Server.MonitoredItemQueueFactory,
this,
monitoredItemId,
attributeId,
Expand Down
Loading
Loading