-
Notifications
You must be signed in to change notification settings - Fork 370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add callback to delete subscriptions #1656
Conversation
@@ -44,7 +44,6 @@ def __init__(self, internal_server: "InternalServer", aspace: AddressSpace, subm | |||
self.state = SessionState.Created | |||
self.session_id = ua.NodeId(self._counter) | |||
InternalSession._counter += 1 | |||
self.subscriptions = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This didn't appear to be referenced anywhere, so it felt safe to use the subscription service's version
@@ -116,7 +129,14 @@ async def publish_results(self, requestdata=None): | |||
self._publish_cycles_count, self.data.RevisedLifetimeCount) | |||
# FIXME this will never be send since we do not have publish request anyway | |||
await self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout)) | |||
await self.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using self.stop()
directly was tricky due to the context of the subscription task and stop
trying to directly await the task. Instead, I opted to decouple the stopping of the subscription from the task by setting self._closing
which will immediately terminate the task because it gets checked after exiting this function.
self.aspace, | ||
callback, | ||
request_callback=request_callback, | ||
delete_callback=lambda: self.subscriptions.pop(result.SubscriptionId, None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively the subscription service could be passed to each internal subscription and the dict could be modified there, but this seemed lighter.
I tested this locally by setting my RevisedLifetimeCount to 0 on my subscription and changing the |
@@ -86,7 +85,13 @@ async def close_session(self, delete_subs=True): | |||
if InternalSession._current_connections < 0: | |||
InternalSession._current_connections = 0 | |||
self.state = SessionState.Closed | |||
await self.delete_subscriptions(self.subscriptions) | |||
await self.delete_subscriptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of maintaining a list of subscriptions in two places that would need to be updated, store the session ID in the internal subscription so that it can be fetched directly. Makes managing the lists much easier at the expense of a slight efficiency hit on a function that's not called very often.
for id, sub in self.subscription_service.subscriptions.items() | ||
if sub.session_id == self.session_id | ||
] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would really make a method get_subscription_by_id(id) or similar. but OK maybe this is fine..
Would it make sense to do a new release soon to include these changes? |
As discussed here, we need to clean up some subscription objects when a subscription deletion within
internal_subscription.py
due to an exceededRevisedLifetimeCount
. In this PR, I implement a callback that cleans up the subscription objects in the subscription service that is called when the subscription is stopped in this fashion.