Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connectors/opcua/high load #1501

Merged
merged 37 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7d433cd
Refactored OPC-UA connector
samson0v Aug 14, 2024
ef3ce44
Merge pull request #1500 from samson0v/connectors/opcua/optimization
samson0v Aug 14, 2024
e9683a0
Added handling for non primitive types
imbeacon Aug 14, 2024
91cb991
Fix for getting connector configuration from main configuration file
imbeacon Aug 15, 2024
2fd84ed
Adjusted non-primitive parsing
imbeacon Aug 15, 2024
9d18a0a
build docker script added. requirement for asyncua added
smatvienko-tb Aug 14, 2024
23c3fc4
Merge pull request #1502 from smatvienko-tb/connectors/opcua/high-load
imbeacon Aug 15, 2024
4f8b1f4
Adjusted batch reading, changed method of reading to get correct proc…
samson0v Aug 15, 2024
fe972e4
docker python12 and cleanup
smatvienko-tb Aug 15, 2024
917860e
Merge pull request #1503 from smatvienko-tb/docker-python12-and-cleanup
imbeacon Aug 15, 2024
db7cc12
Fixed passing arguments
samson0v Aug 15, 2024
7ccf445
Fixed attribute error
samson0v Aug 15, 2024
1d2db94
Merge branch 'master' into connectors/opcua/high-load
samson0v Aug 15, 2024
84a6d02
Added processing primitive types
samson0v Aug 15, 2024
ed0fe1e
Added additional log
samson0v Aug 15, 2024
1442a2a
Added error handling
samson0v Aug 15, 2024
27772d2
Fixed list data converting
samson0v Aug 15, 2024
ca4b0ae
Fixed BadNothingToDo error
samson0v Aug 15, 2024
3a47629
Hardcoded queue parameters
imbeacon Aug 15, 2024
d61788d
Changed polling delay logic
imbeacon Aug 15, 2024
05dfc5b
Added pollPeriodInMillis parameter, renamed scanPeriodInSec parameter…
samson0v Aug 16, 2024
ef8acad
Updated default values in tb_gateway.json
samson0v Aug 16, 2024
58fbf71
Fix for asyncua library installation at runtime
imbeacon Aug 16, 2024
4ed867e
Fix for loosing mapping array
imbeacon Aug 16, 2024
983d570
Reduced sleeping and refactored max payload size processing configura…
imbeacon Aug 16, 2024
2f79b61
Added session timeout for OCPUA client
imbeacon Aug 17, 2024
267128d
Added conversion for scan period
imbeacon Aug 17, 2024
33b8ca2
Changed configuration conversion, fix for opcua canncelled error hand…
imbeacon Aug 17, 2024
b89c4b9
Fix for unintialized gateway main loop messages processing
imbeacon Aug 17, 2024
fb2f036
Fixed getting scan period
samson0v Aug 19, 2024
a5c8534
Renamed scanPeriodInSec to scanPeriodInMillis
samson0v Aug 19, 2024
0d60037
Fixed converting data from subscriptions
samson0v Aug 19, 2024
fa5a005
Fixed error handling in the main loop (BadTooManySessions)
samson0v Aug 19, 2024
d737c8b
Minor performance improving
samson0v Aug 20, 2024
e32b93c
Avoiding received connector config mutating
samson0v Aug 20, 2024
8c480ff
Minor improvements for initialization
imbeacon Aug 21, 2024
a1283de
Small fixes
samson0v Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions build_latest_docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/bin/sh
# Copyright 2024. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -e # exit on any error

# Fetch the current branch name and latest commit ID
BRANCH_NAME=$(git rev-parse --abbrev-ref HEAD | sed 's/[\/]/-/g')
COMMIT_ID=$(git rev-parse --short HEAD)

# Combine them to create a version tag
VERSION_TAG="${BRANCH_NAME}-${COMMIT_ID}"

echo "$(date) Building project with version tag $VERSION_TAG ..."
set -x

# multi arch
DOCKER_CLI_EXPERIMENTAL=enabled \
docker buildx build . -t tb-gateway:$VERSION_TAG -f docker/Dockerfile --platform=linux/amd64,linux/arm64 -o type=registry

set +x
echo "$(date) Done."
7 changes: 5 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$TARGETPLATFORM python:3.11-slim AS build
FROM --platform=$TARGETPLATFORM python:3.12-slim AS build

ARG TARGETPLATFORM
ARG BUILDPLATFORM
Expand Down Expand Up @@ -29,8 +29,11 @@ RUN mkdir -p /default-config/config /default-config/extensions/ && \
echo "Unsupported platform detected. Trying to use default value...";; \
esac && \
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-host=$DEFAULT_HOST --profile minimal && \
apt-get remove --purge -y \
gcc python3-dev build-essential libssl-dev libffi-dev zlib1g-dev pkg-config && \
apt-get autoremove -y && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
rm -rf /var/lib/apt/lists/* /tmp/* && \
echo '#!/bin/sh\n\
# Main start script\n\
CONF_FOLDER="/thingsboard_gateway/config"\n\
Expand Down
4 changes: 2 additions & 2 deletions thingsboard_gateway/config/tb_gateway.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
"enable": false,
"filterFile": "list.json"
},
"maxPayloadSizeBytes": 1024,
"minPackSendDelayMS": 200,
"maxPayloadSizeBytes": 8196,
"minPackSendDelayMS": 50,
"minPackSizeToSend": 500,
"checkConnectorsConfigurationInSeconds": 60,
"handleDeviceRenaming": true,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from copy import copy
from copy import deepcopy
import re


Expand All @@ -11,13 +11,16 @@ class BackwardCompatibilityAdapter:
}

def __init__(self, config, logger):
self._config = copy(config)
self._config = deepcopy(config)
self._log = logger

def convert(self):
for node_config in self._config.get('server', {}).get('mapping', []):
mapping_configuration = deepcopy(self._config.get('server', {}).get('mapping', []))
if not mapping_configuration:
return self._config
for node_config in mapping_configuration:
try:
node_config['deviceNodeSource'] = self.get_value_source(node_config['deviceNodePattern'])
node_config['deviceNodeSource'] = self.get_value_source(node_config['deviceNodePattern'], False)

device_type_pattern = node_config.pop('deviceTypePattern', 'default')
device_name_pattern = node_config.pop('deviceNamePattern', None)
Expand Down Expand Up @@ -60,16 +63,18 @@ def convert(self):
self._log.error('Error during conversion: ', e)
self._log.info('Config: ', node_config)

mapping = self._config.get('server', {}).pop('mapping', [])
self._config['mapping'] = mapping
# Removing old mapping section
self._config['server'].pop('mapping')
# Adding new mapping section
self._config['mapping'] = mapping_configuration

return self._config

@staticmethod
def get_value_source(value):
def get_value_source(value, possible_constant=True):
if re.search(r"(ns=\d+;[isgb]=[^}]+)", value):
return 'identifier'
elif re.search(r"\${([A-Za-z.:\\\d]+)}", value):
elif re.search(r"\${([A-Za-z.:\\\d]+)}", value) or not possible_constant:
return 'path'
else:
return 'constant'
1 change: 1 addition & 0 deletions thingsboard_gateway/connectors/opcua/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, path, name, config, converter, converter_for_sub, logger):
'timeseries': [],
'attributes': []
}
self.nodes = []

self.load_values()

Expand Down
162 changes: 95 additions & 67 deletions thingsboard_gateway/connectors/opcua/opcua_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ def __init__(self, gateway, config, connector_type):
self.daemon = True

self.__device_nodes = []
self.__last_poll = 0
self.__next_poll = 0
self.__next_scan = 0

def open(self):
self.__stopped = False
Expand Down Expand Up @@ -203,6 +204,7 @@ async def start_client(self):
try:
self.__client = asyncua.Client(url=self.__opcua_url,
timeout=self.__server_conf.get('timeoutInMillis', 4000) / 1000)
self.__client.session_timeout = self.__server_conf.get('sessionTimeoutInMillis', 3600000)

if self.__server_conf["identity"].get("type") == "cert.PEM":
await self.__set_auth_settings_by_cert()
Expand All @@ -220,41 +222,44 @@ async def start_client(self):
except Exception as e:
self.__log.error("Error on loading type definitions:\n %s", e)

scan_period = self.__server_conf.get('scanPeriodInMillis', 5000) / 1000
poll_period = int(self.__server_conf.get('pollPeriodInMillis', 5000) / 1000)
scan_period = int(self.__server_conf.get('scanPeriodInMillis', 3600000) / 1000)

while not self.__stopped:
if monotonic() - self.__last_poll >= scan_period:
if monotonic() >= self.__next_scan:
self.__next_scan = monotonic() + scan_period
await self.__scan_device_nodes()

if monotonic() >= self.__next_poll:
self.__next_poll = monotonic() + poll_period
await self.__poll_nodes()
self.__last_poll = monotonic()

if not scan_period < 0.2:
await asyncio.sleep(0.2)
else:
await asyncio.sleep(scan_period)
current_time = monotonic()
time_to_sleep = min(self.__next_poll - current_time, self.__next_scan - current_time)
if time_to_sleep > 0:
await asyncio.sleep(time_to_sleep)
except (ConnectionError, BadSessionClosed):
self.__log.warning('Connection lost for %s', self.get_name())
except asyncio.exceptions.TimeoutError:
self.__log.warning('Failed to connect %s', self.get_name())
except asyncio.CancelledError as e:
if self.__stopped:
self.__log.debug('Task was cancelled due to connector stop: %s', e.__str__())
break
else:
self.__log.exception('Task was cancelled: %s', e.__str__())
except UaStatusCodeError as e:
self.__log.error('Error in main loop, trying to reconnect: %s', exc_info=e)
if self.__connected:
await self.__client.disconnect()
self.__connected = False
break
except Exception as e:
self.__log.exception("Error in main loop: %s", e)
break
finally:
if self.__connected:
await self.__client.disconnect()
self.__connected = False
await asyncio.sleep(1)
if self.__stopped:
if self.__connected:
await self.__client.disconnect()
self.__connected = False
await asyncio.sleep(.5)

async def __set_auth_settings_by_cert(self):
try:
Expand Down Expand Up @@ -340,40 +345,6 @@ async def find_node_name_space_index(self, path):
unresolved = path[resolved_level:]
return await self.__find_nodes(unresolved, current_parent_node=parent_node, nodes=resolved)

async def __scan_device_nodes(self):
existing_devices = list(map(lambda dev: dev.name, self.__device_nodes))

scanned_devices = []
for device in self.__config.get('mapping', []):
nodes = await self.find_nodes(device['deviceNodePattern'])
self.__log.debug('Found devices: %s', nodes)

device_names = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceNameExpression'))

for device_name in device_names:
scanned_devices.append(device_name)
if device_name not in existing_devices:
for node in nodes:
converter = self.__load_converter(device)
device_type = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceProfileExpression', 'default'),
get_first=True)
device_config = {**device, 'device_name': device_name, 'device_type': device_type}
self.__device_nodes.append(
Device(path=node, name=device_name, config=device_config,
converter=converter(device_config, self.__log),
converter_for_sub=converter(device_config, self.__log) if not self.__server_conf.get(
'disableSubscriptions',
False) else None, logger=self.__log))
self.__log.info('Added device node: %s', device_name)

for device_name in existing_devices:
if device_name not in scanned_devices:
await self.__reset_nodes(device_name)

self.__log.debug('Device nodes: %s', self.__device_nodes)

async def _get_device_info_by_pattern(self, pattern, get_first=False):
result = []

Expand Down Expand Up @@ -411,17 +382,56 @@ def __convert_sub_data(self):
for section in ('attributes', 'timeseries'):
for node in device.values.get(section, []):
if node.get('id') == sub_node.__str__():
device.converter_for_sub.convert(config={'section': section, 'key': node['key']},
val=data.monitored_item.Value)
device.converter_for_sub.convert({'section': section, 'key': node['key']},
data.monitored_item.Value)
converter_data = device.converter_for_sub.get_data()

if converter_data:
self.__data_to_send.put(*converter_data)
device.converter_for_sub.clear_data()
else:
sleep(.2)
sleep(.05)

async def __poll_nodes(self):
async def __scan_device_nodes(self):
await self._create_new_devices()
await self._load_devices_nodes()

async def _create_new_devices(self):
existing_devices = list(map(lambda dev: dev.name, self.__device_nodes))

scanned_devices = []
for device in self.__config.get('mapping', []):
nodes = await self.find_nodes(device['deviceNodePattern'])
self.__log.debug('Found devices: %s', nodes)

device_names = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceNameExpression'))

for device_name in device_names:
scanned_devices.append(device_name)
if device_name not in existing_devices:
for node in nodes:
converter = self.__load_converter(device)
device_type = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceProfileExpression', 'default'),
get_first=True)
device_config = {**device, 'device_name': device_name, 'device_type': device_type}
self.__device_nodes.append(
Device(path=node, name=device_name, config=device_config,
converter=converter(device_config, self.__log),
converter_for_sub=converter(device_config, self.__log) if not self.__server_conf.get(
'disableSubscriptions',
False) else None, logger=self.__log))

self.__log.info('Added device node: %s', device_name)

for device_name in existing_devices:
if device_name not in scanned_devices:
await self.__reset_nodes(device_name)

self.__log.debug('Device nodes: %s', self.__device_nodes)

async def _load_devices_nodes(self):
for device in self.__device_nodes:
for section in ('attributes', 'timeseries'):
for node in device.values.get(section, []):
Expand All @@ -434,24 +444,25 @@ async def __poll_nodes(self):
qualified_path = await self.find_node_name_space_index(path)
if len(qualified_path) == 0:
if node.get('valid', True):
self.__log.warning('Node not found; device: %s, key: %s, path: %s', device.name,
self.__log.warning('Node not found; device: %s, key: %s, path: %s',
device.name,
node['key'], node['path'])
await self.__reset_node(node)
continue
elif len(qualified_path) > 1:
self.__log.warning(
'Multiple matching nodes found; device: %s, key: %s, path: %s; %s', device.name,
'Multiple matching nodes found; device: %s, key: %s, path: %s; %s',
device.name,
node['key'], node['path'], qualified_path)
node['qualified_path'] = qualified_path[0]
path = qualified_path[0]

var = await self.__client.nodes.root.get_child(path)

device.nodes.append({'var': var, 'key': node['key'], 'section': section})

if (node.get('valid') is None or
(node.get('valid') and self.__server_conf.get('disableSubscriptions', False))):
value = await var.read_data_value()
device.converter.convert(config={'section': section, 'key': node['key']}, val=value)

if (not self.__server_conf.get('disableSubscriptions', False)
and not node.get('sub_on', False)
and not self.__stopped):
Expand All @@ -462,14 +473,16 @@ async def __poll_nodes(self):
node['subscription'] = handle
node['sub_on'] = True
node['id'] = var.nodeid.to_string()
self.__log.info("Subscribed on data change; device: %s, key: %s, path: %s", device.name,
self.__log.info("Subscribed on data change; device: %s, key: %s, path: %s",
device.name,
node['key'], node['id'])

node['valid'] = True
except ConnectionError:
raise
except (BadNodeIdUnknown, BadConnectionClosed, BadInvalidState, BadAttributeIdInvalid,
BadCommunicationError, BadOutOfService, BadNoMatch, BadUnexpectedError, UaStatusCodeErrors,
BadCommunicationError, BadOutOfService, BadNoMatch, BadUnexpectedError,
UaStatusCodeErrors,
BadWaitingForInitialData):
if node.get('valid', True):
self.__log.warning('Node not found (2); device: %s, key: %s, path: %s', device.name,
Expand All @@ -484,23 +497,38 @@ async def __poll_nodes(self):
self.__log.exception(e)
await self.__reset_node(node)

converter_data = device.converter.get_data()
if converter_data:
self.__data_to_send.put(*converter_data)
async def __poll_nodes(self):
all_nodes = [node_config['var'] for device in self.__device_nodes for node_config in device.nodes]

if len(all_nodes) > 0:
values = await self.__client.read_attributes(all_nodes)

converted_nodes_count = 0
for device in self.__device_nodes:
nodes_count = len(device.nodes)
device_values = values[converted_nodes_count:converted_nodes_count + nodes_count]
converted_nodes_count += nodes_count
device.converter.convert(device.nodes, device_values)
converter_data = device.converter.get_data()
if converter_data:
self.__data_to_send.put(*converter_data)

device.converter.clear_data()
device.converter.clear_data()

self.__log.debug('Converted nodes values count: %s', converted_nodes_count)
else:
self.__log.info('No nodes to poll')

def __send_data(self):
while not self.__stopped:
if not self.__data_to_send.empty():
data = self.__data_to_send.get()
self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1
self.__log.debug(data)
self.__gateway.send_to_storage(self.get_name(), self.get_id(), data)
self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1
self.__log.debug('Data to ThingsBoard %s', data)
self.__log.debug('Count data packs to ThingsBoard: %s', self.statistics['MessagesSent'])
else:
sleep(.2)
sleep(.05)

async def get_shared_attr_node_id(self, path, result={}):
try:
Expand Down
Loading
Loading