Skip to content

Commit

Permalink
Implement the close async codepath (#2381)
Browse files Browse the repository at this point in the history
- CloseAsync still used the 'Close' synchronous function
- remove a duplicate definition of IDisposable on ISession
- reduce the log output when the server publish time is out of sync
  • Loading branch information
mregen authored Nov 17, 2023
1 parent f73fbef commit 973d342
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 18 deletions.
4 changes: 0 additions & 4 deletions Applications/ConsoleReferenceClient/UAClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,7 @@ public async Task<bool> ConnectAsync(string serverUrl, bool useSecurity = true,
EndpointConfiguration endpointConfiguration = EndpointConfiguration.Create(m_configuration);
ConfiguredEndpoint endpoint = new ConfiguredEndpoint(null, endpointDescription, endpointConfiguration);

#if NET6_0_OR_GREATER
var sessionFactory = TraceableSessionFactory.Instance;
#else
var sessionFactory = DefaultSessionFactory.Instance;
#endif

// Create the session
var session = await sessionFactory.CreateAsync(
Expand Down
2 changes: 1 addition & 1 deletion Libraries/Opc.Ua.Client/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ namespace Opc.Ua.Client
/// <summary>
/// Manages a session with a server.
/// </summary>
public interface ISession : ISessionClient, IDisposable
public interface ISession : ISessionClient
{
#region Events
/// <summary>
Expand Down
8 changes: 4 additions & 4 deletions Libraries/Opc.Ua.Client/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1873,8 +1873,8 @@ public async Task<Dictionary<NodeId, DataDictionary>> LoadDataTypeSystem(NodeId
{
if (StatusCode.IsNotBad(errors[ii].StatusCode))
{
namespaces[((NodeId)referenceNodeIds[ii])] = (string)nameSpaceValues[ii];
}
namespaces[((NodeId)referenceNodeIds[ii])] = (string)nameSpaceValues[ii];
}
else
{
Utils.LogWarning("Failed to load namespace {0}: {1}", namespaceNodeIds[ii], errors[ii]);
Expand Down Expand Up @@ -5646,13 +5646,13 @@ private void ProcessPublishResponse(
// Validate publish time and reject old values.
if (notificationMessage.PublishTime.AddMilliseconds(subscription.CurrentPublishingInterval * subscription.CurrentLifetimeCount) < DateTime.UtcNow)
{
Utils.LogWarning("PublishTime {0} in publish response is too old for SubscriptionId {1}.", notificationMessage.PublishTime.ToLocalTime(), subscription.Id);
Utils.LogTrace("PublishTime {0} in publish response is too old for SubscriptionId {1}.", notificationMessage.PublishTime.ToLocalTime(), subscription.Id);
}

// Validate publish time and reject old values.
if (notificationMessage.PublishTime > DateTime.UtcNow.AddMilliseconds(subscription.CurrentPublishingInterval * subscription.CurrentLifetimeCount))
{
Utils.LogWarning("PublishTime {0} in publish response is newer than actual time for SubscriptionId {1}.", notificationMessage.PublishTime.ToLocalTime(), subscription.Id);
Utils.LogTrace("PublishTime {0} in publish response is newer than actual time for SubscriptionId {1}.", notificationMessage.PublishTime.ToLocalTime(), subscription.Id);
}

// update subscription cache.
Expand Down
2 changes: 1 addition & 1 deletion Libraries/Opc.Ua.Configuration/ApplicationInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ public async Task<bool> CheckApplicationInstanceCertificate(
else
{
var message = new StringBuilder();
message.AppendLine("Thumbprint was explicitly specified in the configuration. ");
message.AppendLine("Thumbprint was explicitly specified in the configuration.");
message.AppendLine("Cannot generate a new certificate.");
throw ServiceResultException.Create(StatusCodes.BadConfigurationError, message.ToString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ public void Close()
m_client?.Dispose();
}

/// <inheritdoc/>
public Task CloseAsync(CancellationToken ct)
{
Close();
return Task.CompletedTask;
}

/// <summary>
/// The async result class for the Https transport.
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions Stack/Opc.Ua.Core/Stack/Client/ClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,16 @@ public virtual StatusCode Close()
/// <summary>
/// Closes the channel using async call.
/// </summary>
public virtual Task<StatusCode> CloseAsync(CancellationToken ct = default)
public async virtual Task<StatusCode> CloseAsync(CancellationToken ct = default)
{
if (m_channel != null)
{
m_channel.Close();
await m_channel.CloseAsync(ct).ConfigureAwait(false);
m_channel = null;
}

m_authenticationToken = null;
return Task.FromResult<StatusCode>(StatusCodes.Good);
return StatusCodes.Good;
}

/// <summary>
Expand Down
14 changes: 14 additions & 0 deletions Stack/Opc.Ua.Core/Stack/Client/UaChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,20 @@ public void Close()
CloseChannel();
}

/// <summary>
/// Closes any existing secure channel.
/// </summary>
public async Task CloseAsync(CancellationToken ct)
{
if (m_uaBypassChannel != null)
{
await m_uaBypassChannel.CloseAsync(ct).ConfigureAwait(false);
return;
}

CloseChannel();
}

/// <summary>
/// Begins an asynchronous operation to close the secure channel.
/// </summary>
Expand Down
41 changes: 41 additions & 0 deletions Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,47 @@ public void Close(int timeout)
Shutdown(StatusCodes.BadConnectionClosed);
}

/// <summary>
/// Closes a connection with the server (async).
/// </summary>
public async Task CloseAsync(int timeout)
{
WriteOperation operation = InternalClose(timeout);

// wait for the close to succeed.
if (operation != null)
{
try
{
await operation.EndAsync(timeout, false).ConfigureAwait(false);
}
catch (ServiceResultException e)
{
switch (e.StatusCode)
{
case StatusCodes.BadRequestInterrupted:
case StatusCodes.BadSecureChannelClosed:
{
break;
}

default:
{
Utils.LogWarning(e, "ChannelId {0}: Could not gracefully close the channel. Reason={1}", ChannelId, e.Result.StatusCode);
break;
}
}
}
catch (Exception e)
{
Utils.LogError(e, "ChannelId {0}: Could not gracefully close the channel.", ChannelId);
}
}

// shutdown.
Shutdown(StatusCodes.BadConnectionClosed);
}

/// <summary>
/// Sends a request to the server.
/// </summary>
Expand Down
27 changes: 25 additions & 2 deletions Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryTransportChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace Opc.Ua.Bindings
/// </summary>
public class UaSCUaBinaryTransportChannel : ITransportChannel, IMessageSocketChannel
{
private const int kChannelCloseDefault = 1_000;

#region Constructors
/// <summary>
/// Create a transport channel from a message socket factory.
Expand Down Expand Up @@ -223,7 +225,7 @@ public void Reconnect(ITransportWaitingConnection connection)
{
try
{
channel.Close(1000);
channel.Close(kChannelCloseDefault);
}
catch (Exception e)
{
Expand Down Expand Up @@ -277,13 +279,34 @@ public void Close()
{
if (m_channel != null)
{
m_channel.Close(1000);
m_channel.Close(kChannelCloseDefault);
m_channel = null;
}
}
}
}

/// <summary>
/// Closes the secure channel (async).
/// </summary>
/// <exception cref="ServiceResultException">Thrown if any communication error occurs.</exception>
public async Task CloseAsync(CancellationToken ct)
{
UaSCUaBinaryClientChannel channel = null;
lock (m_lock)
{
if (m_channel != null)
{
channel = m_channel;
m_channel = null;
}
}
if (channel != null)
{
await channel.CloseAsync(kChannelCloseDefault, ct).ConfigureAwait(false);
}
}

/// <summary>
/// Begins an asynchronous operation to close the secure channel.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions Stack/Opc.Ua.Core/Stack/Transport/ITransportChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ IAsyncResult BeginOpen(
/// <exception cref="ServiceResultException">Thrown if any communication error occurs.</exception>
void Close();

/// <summary>
/// Closes the secure channel (async).
/// </summary>
/// <exception cref="ServiceResultException">Thrown if any communication error occurs.</exception>
Task CloseAsync(CancellationToken ct);

/// <summary>
/// Begins an asynchronous operation to close the secure channel.
/// </summary>
Expand Down
4 changes: 3 additions & 1 deletion Tests/Opc.Ua.Client.Tests/ClientFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ public async Task<EndpointDescriptionCollection> GetEndpoints(Uri url)

using (var client = DiscoveryClient.Create(url, endpointConfiguration))
{
return await client.GetEndpointsAsync(null).ConfigureAwait(false);
var result = await client.GetEndpointsAsync(null).ConfigureAwait(false);
await client.CloseAsync().ConfigureAwait(false);
return result;
}
}

Expand Down
37 changes: 35 additions & 2 deletions Tests/Opc.Ua.Client.Tests/ClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public async Task GetEndpointsAsync()

using (var client = DiscoveryClient.Create(ServerUrl, endpointConfiguration))
{
Endpoints = await client.GetEndpointsAsync(null).ConfigureAwait(false);
Endpoints = await client.GetEndpointsAsync(null, CancellationToken.None).ConfigureAwait(false);
var statusCode = await client.CloseAsync(CancellationToken.None).ConfigureAwait(false);
Assert.AreEqual((StatusCode)StatusCodes.Good, statusCode);

TestContext.Out.WriteLine("Endpoints:");
foreach (var endpoint in Endpoints)
{
Expand Down Expand Up @@ -177,6 +180,9 @@ public async Task FindServersAsync()
using (var client = DiscoveryClient.Create(ServerUrl, endpointConfiguration))
{
var servers = await client.FindServersAsync(null).ConfigureAwait(false);
var statusCode = await client.CloseAsync(CancellationToken.None).ConfigureAwait(false);
Assert.AreEqual((StatusCode)StatusCodes.Good, statusCode);

foreach (var server in servers)
{
TestContext.Out.WriteLine("{0}", server.ApplicationName);
Expand All @@ -202,6 +208,9 @@ public async Task FindServersOnNetworkAsync()
try
{
var response = await client.FindServersOnNetworkAsync(null, 0, 100, null, CancellationToken.None).ConfigureAwait(false);
var statusCode = await client.CloseAsync(CancellationToken.None).ConfigureAwait(false);
Assert.AreEqual((StatusCode)StatusCodes.Good, statusCode);

foreach (ServerOnNetwork server in response.Servers)
{
TestContext.Out.WriteLine("{0}", server.ServerName);
Expand Down Expand Up @@ -336,11 +345,35 @@ public async Task ConnectAndCloseAsync(string securityPolicy)
var session = await ClientFixture.ConnectAsync(ServerUrl, securityPolicy, Endpoints).ConfigureAwait(false);
Assert.NotNull(session);
Session.SessionClosing += Session_Closing;
var result = await session.CloseAsync(5_000, closeChannel).ConfigureAwait(false);
var result = await session.CloseAsync(5_000, closeChannel, CancellationToken.None).ConfigureAwait(false);
Assert.NotNull(result);
session.Dispose();
}

[Test, Order(202)]
public async Task ConnectAndCloseAsyncReadAfterClose()
{
var securityPolicy = SecurityPolicies.Basic256Sha256;
using (var session = await ClientFixture.ConnectAsync(ServerUrl, securityPolicy, Endpoints).ConfigureAwait(false))
{
Assert.NotNull(session);
Session.SessionClosing += Session_Closing;

var nodeId = new NodeId(Opc.Ua.VariableIds.ServerStatusType_BuildInfo);
var node = await session.ReadNodeAsync(nodeId, CancellationToken.None).ConfigureAwait(false);
var value = await session.ReadValueAsync(nodeId, CancellationToken.None).ConfigureAwait(false);

// keep channel open
var result = await session.CloseAsync(1_000, false).ConfigureAwait(false);
Assert.AreEqual((StatusCode)StatusCodes.Good, result);

await Task.Delay(5_000).ConfigureAwait(false);

var sre = Assert.ThrowsAsync<ServiceResultException>(async () => await session.ReadNodeAsync(nodeId, CancellationToken.None).ConfigureAwait(false));
Assert.AreEqual((StatusCode)StatusCodes.BadSessionIdInvalid, sre.StatusCode);
}
}

[Theory, Order(210)]
public async Task ConnectAndReconnectAsync(bool reconnectAbort, bool useMaxReconnectPeriod)
{
Expand Down

0 comments on commit 973d342

Please sign in to comment.