Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the close async codepath #2381

Merged
merged 6 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Applications/ConsoleReferenceClient/UAClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,7 @@ public async Task<bool> ConnectAsync(string serverUrl, bool useSecurity = true,
EndpointConfiguration endpointConfiguration = EndpointConfiguration.Create(m_configuration);
ConfiguredEndpoint endpoint = new ConfiguredEndpoint(null, endpointDescription, endpointConfiguration);

#if NET6_0_OR_GREATER
var sessionFactory = TraceableSessionFactory.Instance;
#else
var sessionFactory = DefaultSessionFactory.Instance;
mrsuciu marked this conversation as resolved.
Show resolved Hide resolved
#endif

// Create the session
var session = await sessionFactory.CreateAsync(
Expand Down
2 changes: 1 addition & 1 deletion Libraries/Opc.Ua.Client/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ namespace Opc.Ua.Client
/// <summary>
/// Manages a session with a server.
/// </summary>
public interface ISession : ISessionClient, IDisposable
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IClientBase is already disposable

public interface ISession : ISessionClient
{
#region Events
/// <summary>
Expand Down
8 changes: 4 additions & 4 deletions Libraries/Opc.Ua.Client/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1873,8 +1873,8 @@ public async Task<Dictionary<NodeId, DataDictionary>> LoadDataTypeSystem(NodeId
{
if (StatusCode.IsNotBad(errors[ii].StatusCode))
{
namespaces[((NodeId)referenceNodeIds[ii])] = (string)nameSpaceValues[ii];
}
namespaces[((NodeId)referenceNodeIds[ii])] = (string)nameSpaceValues[ii];
}
else
{
Utils.LogWarning("Failed to load namespace {0}: {1}", namespaceNodeIds[ii], errors[ii]);
Expand Down Expand Up @@ -5646,13 +5646,13 @@ private void ProcessPublishResponse(
// Validate publish time and reject old values.
if (notificationMessage.PublishTime.AddMilliseconds(subscription.CurrentPublishingInterval * subscription.CurrentLifetimeCount) < DateTime.UtcNow)
{
Utils.LogWarning("PublishTime {0} in publish response is too old for SubscriptionId {1}.", notificationMessage.PublishTime.ToLocalTime(), subscription.Id);
Utils.LogTrace("PublishTime {0} in publish response is too old for SubscriptionId {1}.", notificationMessage.PublishTime.ToLocalTime(), subscription.Id);
}

// Validate publish time and reject old values.
if (notificationMessage.PublishTime > DateTime.UtcNow.AddMilliseconds(subscription.CurrentPublishingInterval * subscription.CurrentLifetimeCount))
{
Utils.LogWarning("PublishTime {0} in publish response is newer than actual time for SubscriptionId {1}.", notificationMessage.PublishTime.ToLocalTime(), subscription.Id);
Utils.LogTrace("PublishTime {0} in publish response is newer than actual time for SubscriptionId {1}.", notificationMessage.PublishTime.ToLocalTime(), subscription.Id);
}

// update subscription cache.
Expand Down
2 changes: 1 addition & 1 deletion Libraries/Opc.Ua.Configuration/ApplicationInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ public async Task<bool> CheckApplicationInstanceCertificate(
else
{
var message = new StringBuilder();
message.AppendLine("Thumbprint was explicitly specified in the configuration. ");
message.AppendLine("Thumbprint was explicitly specified in the configuration.");
message.AppendLine("Cannot generate a new certificate.");
throw ServiceResultException.Create(StatusCodes.BadConfigurationError, message.ToString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ public void Close()
m_client?.Dispose();
}

/// <inheritdoc/>
public Task CloseAsync(CancellationToken ct)
{
Close();
return Task.CompletedTask;
}

/// <summary>
/// The async result class for the Https transport.
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions Stack/Opc.Ua.Core/Stack/Client/ClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,16 @@ public virtual StatusCode Close()
/// <summary>
/// Closes the channel using async call.
/// </summary>
public virtual Task<StatusCode> CloseAsync(CancellationToken ct = default)
public async virtual Task<StatusCode> CloseAsync(CancellationToken ct = default)
{
if (m_channel != null)
{
m_channel.Close();
await m_channel.CloseAsync(ct).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure, that this call can't fail/throw exception

m_channel = null;
}

m_authenticationToken = null;
return Task.FromResult<StatusCode>(StatusCodes.Good);
return StatusCodes.Good;
}

/// <summary>
Expand Down
14 changes: 14 additions & 0 deletions Stack/Opc.Ua.Core/Stack/Client/UaChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,20 @@ public void Close()
CloseChannel();
}

/// <summary>
/// Closes any existing secure channel.
/// </summary>
public async Task CloseAsync(CancellationToken ct)
{
if (m_uaBypassChannel != null)
{
await m_uaBypassChannel.CloseAsync(ct).ConfigureAwait(false);
return;
}

CloseChannel();
}

/// <summary>
/// Begins an asynchronous operation to close the secure channel.
/// </summary>
Expand Down
41 changes: 41 additions & 0 deletions Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,47 @@ public void Close(int timeout)
Shutdown(StatusCodes.BadConnectionClosed);
}

/// <summary>
/// Closes a connection with the server (async).
/// </summary>
public async Task CloseAsync(int timeout)
{
WriteOperation operation = InternalClose(timeout);

// wait for the close to succeed.
if (operation != null)
{
try
{
await operation.EndAsync(timeout, false).ConfigureAwait(false);
}
catch (ServiceResultException e)
{
switch (e.StatusCode)
{
case StatusCodes.BadRequestInterrupted:
case StatusCodes.BadSecureChannelClosed:
{
break;
}

default:
{
Utils.LogWarning(e, "ChannelId {0}: Could not gracefully close the channel. Reason={1}", ChannelId, e.Result.StatusCode);
break;
}
}
}
catch (Exception e)
{
Utils.LogError(e, "ChannelId {0}: Could not gracefully close the channel.", ChannelId);
}
}

// shutdown.
Shutdown(StatusCodes.BadConnectionClosed);
}

/// <summary>
/// Sends a request to the server.
/// </summary>
Expand Down
27 changes: 25 additions & 2 deletions Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryTransportChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace Opc.Ua.Bindings
/// </summary>
public class UaSCUaBinaryTransportChannel : ITransportChannel, IMessageSocketChannel
{
private const int kChannelCloseDefault = 1_000;

#region Constructors
/// <summary>
/// Create a transport channel from a message socket factory.
Expand Down Expand Up @@ -223,7 +225,7 @@ public void Reconnect(ITransportWaitingConnection connection)
{
try
{
channel.Close(1000);
channel.Close(kChannelCloseDefault);
}
catch (Exception e)
{
Expand Down Expand Up @@ -277,13 +279,34 @@ public void Close()
{
if (m_channel != null)
{
m_channel.Close(1000);
m_channel.Close(kChannelCloseDefault);
m_channel = null;
}
}
}
}

/// <summary>
/// Closes the secure channel (async).
/// </summary>
/// <exception cref="ServiceResultException">Thrown if any communication error occurs.</exception>
public async Task CloseAsync(CancellationToken ct)
{
UaSCUaBinaryClientChannel channel = null;
lock (m_lock)
{
if (m_channel != null)
{
channel = m_channel;
m_channel = null;
}
}
if (channel != null)
{
await channel.CloseAsync(kChannelCloseDefault, ct).ConfigureAwait(false);
}
}

/// <summary>
/// Begins an asynchronous operation to close the secure channel.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions Stack/Opc.Ua.Core/Stack/Transport/ITransportChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ IAsyncResult BeginOpen(
/// <exception cref="ServiceResultException">Thrown if any communication error occurs.</exception>
void Close();

/// <summary>
/// Closes the secure channel (async).
/// </summary>
/// <exception cref="ServiceResultException">Thrown if any communication error occurs.</exception>
Task CloseAsync(CancellationToken ct);

/// <summary>
/// Begins an asynchronous operation to close the secure channel.
/// </summary>
Expand Down
4 changes: 3 additions & 1 deletion Tests/Opc.Ua.Client.Tests/ClientFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ public async Task<EndpointDescriptionCollection> GetEndpoints(Uri url)

using (var client = DiscoveryClient.Create(url, endpointConfiguration))
{
return await client.GetEndpointsAsync(null).ConfigureAwait(false);
var result = await client.GetEndpointsAsync(null).ConfigureAwait(false);
await client.CloseAsync().ConfigureAwait(false);
return result;
}
}

Expand Down
37 changes: 35 additions & 2 deletions Tests/Opc.Ua.Client.Tests/ClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public async Task GetEndpointsAsync()

using (var client = DiscoveryClient.Create(ServerUrl, endpointConfiguration))
{
Endpoints = await client.GetEndpointsAsync(null).ConfigureAwait(false);
Endpoints = await client.GetEndpointsAsync(null, CancellationToken.None).ConfigureAwait(false);
var statusCode = await client.CloseAsync(CancellationToken.None).ConfigureAwait(false);
Assert.AreEqual((StatusCode)StatusCodes.Good, statusCode);

TestContext.Out.WriteLine("Endpoints:");
foreach (var endpoint in Endpoints)
{
Expand Down Expand Up @@ -177,6 +180,9 @@ public async Task FindServersAsync()
using (var client = DiscoveryClient.Create(ServerUrl, endpointConfiguration))
{
var servers = await client.FindServersAsync(null).ConfigureAwait(false);
var statusCode = await client.CloseAsync(CancellationToken.None).ConfigureAwait(false);
Assert.AreEqual((StatusCode)StatusCodes.Good, statusCode);

foreach (var server in servers)
{
TestContext.Out.WriteLine("{0}", server.ApplicationName);
Expand All @@ -202,6 +208,9 @@ public async Task FindServersOnNetworkAsync()
try
{
var response = await client.FindServersOnNetworkAsync(null, 0, 100, null, CancellationToken.None).ConfigureAwait(false);
var statusCode = await client.CloseAsync(CancellationToken.None).ConfigureAwait(false);
Assert.AreEqual((StatusCode)StatusCodes.Good, statusCode);

foreach (ServerOnNetwork server in response.Servers)
{
TestContext.Out.WriteLine("{0}", server.ServerName);
Expand Down Expand Up @@ -336,11 +345,35 @@ public async Task ConnectAndCloseAsync(string securityPolicy)
var session = await ClientFixture.ConnectAsync(ServerUrl, securityPolicy, Endpoints).ConfigureAwait(false);
Assert.NotNull(session);
Session.SessionClosing += Session_Closing;
var result = await session.CloseAsync(5_000, closeChannel).ConfigureAwait(false);
var result = await session.CloseAsync(5_000, closeChannel, CancellationToken.None).ConfigureAwait(false);
Assert.NotNull(result);
session.Dispose();
}

[Test, Order(202)]
public async Task ConnectAndCloseAsyncReadAfterClose()
{
var securityPolicy = SecurityPolicies.Basic256Sha256;
using (var session = await ClientFixture.ConnectAsync(ServerUrl, securityPolicy, Endpoints).ConfigureAwait(false))
{
Assert.NotNull(session);
Session.SessionClosing += Session_Closing;

var nodeId = new NodeId(Opc.Ua.VariableIds.ServerStatusType_BuildInfo);
var node = await session.ReadNodeAsync(nodeId, CancellationToken.None).ConfigureAwait(false);
var value = await session.ReadValueAsync(nodeId, CancellationToken.None).ConfigureAwait(false);

// keep channel open
var result = await session.CloseAsync(1_000, false).ConfigureAwait(false);
Assert.AreEqual((StatusCode)StatusCodes.Good, result);

await Task.Delay(5_000).ConfigureAwait(false);

var sre = Assert.ThrowsAsync<ServiceResultException>(async () => await session.ReadNodeAsync(nodeId, CancellationToken.None).ConfigureAwait(false));
Assert.AreEqual((StatusCode)StatusCodes.BadSessionIdInvalid, sre.StatusCode);
}
}

[Theory, Order(210)]
public async Task ConnectAndReconnectAsync(bool reconnectAbort, bool useMaxReconnectPeriod)
{
Expand Down