Skip to content

Commit

Permalink
Implement a system test for DurableSubscriptions
Browse files Browse the repository at this point in the history
Uses the same parameters to validate non Durable vs Durable
  • Loading branch information
Archie-Miller committed Nov 6, 2024
1 parent d2b4aa2 commit 2803dd9
Showing 1 changed file with 188 additions and 2 deletions.
190 changes: 188 additions & 2 deletions Tests/Opc.Ua.Client.Tests/DurableSubscriptionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Configs;
using CommandLine;
using NUnit.Framework;
using Opc.Ua.Server.Tests;
Expand All @@ -55,7 +57,6 @@ public class DurableSubscriptionTest : ClientTestFramework
private readonly string m_subscriptionTestXml = Path.Combine(Path.GetTempPath(), "SubscriptionTest.xml");
public readonly uint MillisecondsPerHour = 3600 * 1000;


#region Test Setup
/// <summary>
/// Set up a Server and a Client instance.
Expand Down Expand Up @@ -98,6 +99,8 @@ override public async Task CreateReferenceServerFixture(
ServerFixture.Config.TransportQuotas.MaxMessageSize = TransportQuotaMaxMessageSize;
ServerFixture.Config.TransportQuotas.MaxByteStringLength =
ServerFixture.Config.TransportQuotas.MaxStringLength = TransportQuotaMaxStringLength;
ServerFixture.Config.ServerConfiguration.MinSessionTimeout = 1000;
ServerFixture.Config.ServerConfiguration.MinSubscriptionLifetime = 1500;
ServerFixture.Config.ServerConfiguration.UserTokenPolicies.Add(new UserTokenPolicy(UserTokenType.UserName));
ServerFixture.Config.ServerConfiguration.UserTokenPolicies.Add(new UserTokenPolicy(UserTokenType.Certificate));
ServerFixture.Config.ServerConfiguration.UserTokenPolicies.Add(
Expand Down Expand Up @@ -133,10 +136,12 @@ public async Task MySetUp()
{
try
{
ClientFixture.SessionTimeout = 1500;
Session = await ClientFixture.ConnectAsync(ServerUrl,
SecurityPolicies.Basic256Sha256,
null,
new UserIdentity("sysadmin", "demo")).ConfigureAwait(false);
Session.DeleteSubscriptionsOnClose = false;
}
catch (Exception e)
{
Expand Down Expand Up @@ -165,7 +170,7 @@ public async Task MySetUp()

#region Tests

[Test, Order(99)]
[Test, Order(100)]
[TestCase(900, 100u, 100u, 10000u, 3600u, 83442u, TestName = "Test Lifetime Over Maximum")]
[TestCase(900, 100u, 100u, 0u, 3600u, 83442u, TestName = "Test Lifetime Zero")]
[TestCase(1200, 100u, 100u, 1u, 1u, 3000u, TestName = "Test Lifetime One")]
Expand Down Expand Up @@ -318,6 +323,187 @@ public void SetSubscriptionDurableFailsWhenSubscriptionDoesNotExist()
id, 1));
}

[Test, Order(200)]
[TestCase(false, TestName = "Validate session close")]
[TestCase(true, TestName = "Validate Transfer")]
public async Task TestSessionTransfer(bool setSubscriptionDurable)
{
int publishingInterval = 100;
uint keepAliveCount = 5;
uint lifetimeCount = 15;
uint requestedHours = 1;
uint expectedHours = 1;
uint expectedLifetime = 36000;

TestableSubscription subscription = new TestableSubscription(Session.DefaultSubscription);

subscription.KeepAliveCount = keepAliveCount;
subscription.LifetimeCount = lifetimeCount;
subscription.PublishingInterval = publishingInterval;
subscription.MinLifetimeInterval = 1500;

subscription.StateChanged += (s, e) => {
TestContext.Out.WriteLine($"StateChanged: {s.Session.SessionId}-{s.Id}-{e.Status}");
};

Assert.True(Session.AddSubscription(subscription));
subscription.Create();

Dictionary<string, NodeId> desiredNodeIds = GetDesiredNodeIds(subscription.Id);
Dictionary<string, object> initialValues = GetValues(desiredNodeIds);

if (setSubscriptionDurable)
{
uint revisedLifetimeInHours = 0;
Assert.True(subscription.SetSubscriptionDurable(requestedHours, out revisedLifetimeInHours));

Assert.AreEqual(expectedHours, revisedLifetimeInHours);

ValidateDataValue(desiredNodeIds, "MaxLifetimeCount", expectedLifetime);
}
else
{
ValidateDataValue(desiredNodeIds, "MaxLifetimeCount", lifetimeCount);
}

List<NodeId> testSet = new List<NodeId>();
testSet.AddRange(GetTestSetFullSimulation(Session.NamespaceUris));
Dictionary<NodeId, List<DateTime>> valueTimeStamps = new Dictionary<NodeId, List<DateTime>>();


List<MonitoredItem> monitoredItemsList = new List<MonitoredItem>();
foreach (NodeId nodeId in testSet)
{
if (nodeId.IdType == IdType.String)
{
valueTimeStamps.Add(nodeId, new List<DateTime>());
MonitoredItem monitoredItem = new MonitoredItem(subscription.DefaultItem) {
StartNodeId = nodeId,
SamplingInterval = 1000,
QueueSize = 100,
};
monitoredItem.Notification += (MonitoredItem item, MonitoredItemNotificationEventArgs e) => {
List<DateTime> list = valueTimeStamps[nodeId];

foreach (var value in item.DequeueValues())
{
list.Add(value.SourceTimestamp);
}
};

monitoredItemsList.Add(monitoredItem);
}
}

DateTime startTime = DateTime.UtcNow;

subscription.AddItems(monitoredItemsList);
subscription.ApplyChanges();
await Task.Delay(2000).ConfigureAwait(false);

Dictionary<string, object> closeValues = GetValues(desiredNodeIds);

SubscriptionCollection subscriptions = new SubscriptionCollection(Session.Subscriptions);
DateTime closeTime = DateTime.UtcNow;
TestContext.Out.WriteLine("Session Id {0} Closed at {1}", Session.SessionId, closeTime);

Session.Close(closeChannel: false);

// Subscription should time out with initial lifetime count
await Task.Delay(3000).ConfigureAwait(false);

DateTime restartTime = DateTime.UtcNow;
ISession transferSession = await ClientFixture.ConnectAsync(ServerUrl,
SecurityPolicies.Basic256Sha256, null,
new UserIdentity("sysadmin", "demo")).ConfigureAwait(false);


var result = await transferSession.TransferSubscriptionsAsync(subscriptions, true);

TestContext.Out.WriteLine("SetSubscriptionDurable = " + setSubscriptionDurable.ToString() +
" Transfer Result " + result.ToString());

Assert.AreEqual(setSubscriptionDurable, result);

if (setSubscriptionDurable)
{
// New Session and Transfer
await Task.Delay(4000).ConfigureAwait(false);

DateTime completionTime = DateTime.UtcNow;

subscription.SetPublishingMode(false);

double tolerance = 1500;

// Validate
int counter = 0;
foreach (KeyValuePair<NodeId, List<DateTime>> pair in valueTimeStamps)
{
DateTime previous = startTime;

for (int index = 0; index < pair.Value.Count; index++)
{
DateTime timestamp = pair.Value[index];

TestContext.Out.WriteLine($"Node: {pair.Key} Index: {index} Time: {timestamp.ToLongTimeString()} Previous: {previous.ToLongTimeString()}");

TimeSpan timeSpan = timestamp - previous;
Assert.Less(Math.Abs(timeSpan.TotalMilliseconds), tolerance,
$"Node: {pair.Key} [{counter}] Index: {index} Timespan {timeSpan.TotalMilliseconds} ");

previous = timestamp;


if (index == pair.Value.Count - 1)
{
TimeSpan finalTimeSpan = completionTime - timestamp;
Assert.Less(Math.Abs(finalTimeSpan.TotalMilliseconds), tolerance);
}
}
}

TestContext.Out.WriteLine("Session StartTime at {0}", startTime);
TestContext.Out.WriteLine("Session Closed at {0}", closeTime);
TestContext.Out.WriteLine("Restart at {0}", restartTime);
TestContext.Out.WriteLine("Completion at {0}", completionTime);

Assert.True(await transferSession.RemoveSubscriptionAsync(subscription));
}
}

private Dictionary<string, object> ValidateDataValue(Dictionary<string, NodeId> nodeIds,
string desiredValue, UInt32 expectedValue)
{
Dictionary<string, object> modifiedValues = GetValues(nodeIds);

DataValue dataValue = modifiedValues[desiredValue] as DataValue;
Assert.IsNotNull(dataValue);
Assert.IsNotNull(dataValue.Value);
Assert.AreEqual(expectedValue, Convert.ToUInt32(dataValue.Value));

return modifiedValues;
}

private void OnMonitoredItemNotification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e)
{
try
{
// Log MonitoredItem Notification event
MonitoredItemNotification notification = e.NotificationValue as MonitoredItemNotification;
DateTime localTime = notification.Value.SourceTimestamp.ToLocalTime();
Debug.WriteLine("Notification: {0} \"{1}\" and Value = {2} at [{3}].",
notification.Message.SequenceNumber,
monitoredItem.ResolvedNodeId,
notification.Value,
localTime.ToLongTimeString());
}
catch (Exception ex)
{
Debug.WriteLine("OnMonitoredItemNotification error: {0}", ex.Message);
}
}

#endregion

#region Helpers
Expand Down

0 comments on commit 2803dd9

Please sign in to comment.