Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for the Sagemcom T210-D-r smart meter #110

Merged
merged 1 commit into from
Oct 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dsmr_parser/obis_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
ELECTRICITY_USED_TARIFF_2 = r'\d-\d:1\.8\.2.+?\r\n'
ELECTRICITY_DELIVERED_TARIFF_1 = r'\d-\d:2\.8\.1.+?\r\n'
ELECTRICITY_DELIVERED_TARIFF_2 = r'\d-\d:2\.8\.2.+?\r\n'
CURRENT_REACTIVE_EXPORTED = r'\d-\d:3\.7\.0.+?\r\n'
ELECTRICITY_REACTIVE_IMPORTED_TOTAL = r'\d-\d:3\.8\.0.+?\r\n'
ELECTRICITY_REACTIVE_IMPORTED_TARIFF_1 = r'\d-\d:3\.8\.1.+?\r\n'
ELECTRICITY_REACTIVE_IMPORTED_TARIFF_2 = r'\d-\d:3\.8\.2.+?\r\n'
CURRENT_REACTIVE_IMPORTED = r'\d-\d:4\.7\.0.+?\r\n'
ELECTRICITY_REACTIVE_EXPORTED_TOTAL = r'\d-\d:4\.8\.0.+?\r\n'
ELECTRICITY_REACTIVE_EXPORTED_TARIFF_1 = r'\d-\d:4\.8\.1.+?\r\n'
ELECTRICITY_REACTIVE_EXPORTED_TARIFF_2 = r'\d-\d:4\.8\.2.+?\r\n'
ELECTRICITY_ACTIVE_TARIFF = r'\d-\d:96\.14\.0.+?\r\n'
EQUIPMENT_IDENTIFIER = r'\d-\d:96\.1\.1.+?\r\n'
CURRENT_ELECTRICITY_USAGE = r'\d-\d:1\.7\.0.+?\r\n'
Expand Down
41 changes: 39 additions & 2 deletions dsmr_parser/parsers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging
import re
from binascii import unhexlify

from ctypes import c_ushort

from dlms_cosem.connection import XDlmsApduFactory
from dlms_cosem.protocol.xdlms import GeneralGlobalCipher

from dsmr_parser.objects import MBusObject, CosemObject, ProfileGenericObject
from dsmr_parser.exceptions import ParseError, InvalidChecksumError

Expand All @@ -22,14 +26,15 @@ def __init__(self, telegram_specification, apply_checksum_validation=True):
self.telegram_specification = telegram_specification
self.apply_checksum_validation = apply_checksum_validation

def parse(self, telegram_data):
def parse(self, telegram_data, encryption_key="", authentication_key=""): # noqa: C901
"""
Parse telegram from string to dict.

The telegram str type makes python 2.x integration easier.

:param str telegram_data: full telegram from start ('/') to checksum
('!ABCD') including line endings in between the telegram's lines
:param str encryption_key: encryption key
:param str authentication_key: authentication key
:rtype: dict
:returns: Shortened example:
{
Expand All @@ -43,6 +48,38 @@ def parse(self, telegram_data):
:raises InvalidChecksumError:
"""

if "general_global_cipher" in self.telegram_specification:
if self.telegram_specification["general_global_cipher"]:
enc_key = unhexlify(encryption_key)
auth_key = unhexlify(authentication_key)
telegram_data = unhexlify(telegram_data)
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
if apdu.security_control.security_suite != 0:
logger.warning("Untested security suite")
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested authentication only")
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested not encrypted or authenticated")
if apdu.security_control.compressed:
logger.warning("Untested compression")
if apdu.security_control.broadcast_key:
logger.warning("Untested broadcast key")
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
raise RuntimeError("Looks like a general_global_cipher frame "
"but telegram specification is not matching!")
except Exception:
pass
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
raise RuntimeError(
"Looks like a general_global_cipher frame but telegram specification is not matching!")
except Exception:
pass

if self.apply_checksum_validation \
and self.telegram_specification['checksum_support']:
self.validate_checksum(telegram_data)
Expand Down
30 changes: 30 additions & 0 deletions dsmr_parser/telegram_specifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,33 @@
obis.Q3D_EQUIPMENT_SERIALNUMBER: CosemParser(ValueParser(str)),
},
}


SAGEMCOM_T210_D_R = {
"general_global_cipher": True,
"checksum_support": True,
'objects': {
obis.P1_MESSAGE_HEADER: CosemParser(ValueParser(str)),
obis.P1_MESSAGE_TIMESTAMP: CosemParser(ValueParser(timestamp)),
obis.ELECTRICITY_IMPORTED_TOTAL: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)),

obis.ELECTRICITY_REACTIVE_EXPORTED_TOTAL: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_REACTIVE_EXPORTED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_REACTIVE_EXPORTED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.CURRENT_REACTIVE_IMPORTED: CosemParser(ValueParser(Decimal)),

obis.ELECTRICITY_EXPORTED_TOTAL: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)),

obis.ELECTRICITY_REACTIVE_IMPORTED_TOTAL: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_REACTIVE_IMPORTED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_REACTIVE_IMPORTED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.CURRENT_REACTIVE_EXPORTED: CosemParser(ValueParser(Decimal)),
}
}
AUSTRIA_ENERGIENETZE_STEIERMARK = SAGEMCOM_T210_D_R
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
'pyserial>=3,<4',
'pyserial-asyncio<1',
'pytz',
'Tailer==0.4.1'
'Tailer==0.4.1',
'dlms_cosem==21.3.2'
],
entry_points={
'console_scripts': ['dsmr_console=dsmr_parser.__main__:console']
Expand Down
24 changes: 24 additions & 0 deletions test/example_telegrams.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,27 @@
' 25818685\r\n'
'DE0000000000000000000000000000003\r\n'
)

TELEGRAM_SAGEMCOM_T210_D_R = (
'/EST5\\253710000_A\r\n'
'\r\n'
'1-3:0.2.8(50)\r\n'
'0-0:1.0.0(221006155014S)\r\n'
'1-0:1.8.0(006545766*Wh)\r\n'
'1-0:1.8.1(005017120*Wh)\r\n'
'1-0:1.8.2(001528646*Wh)\r\n'
'1-0:1.7.0(000000286*W)\r\n'
'1-0:2.8.0(000000058*Wh)\r\n'
'1-0:2.8.1(000000000*Wh)\r\n'
'1-0:2.8.2(000000058*Wh)\r\n'
'1-0:2.7.0(000000000*W)\r\n'
'1-0:3.8.0(000000747*varh)\r\n'
'1-0:3.8.1(000000000*varh)\r\n'
'1-0:3.8.2(000000747*varh)\r\n'
'1-0:3.7.0(000000000*var)\r\n'
'1-0:4.8.0(003897726*varh)\r\n'
'1-0:4.8.1(002692848*varh)\r\n'
'1-0:4.8.2(001204878*varh)\r\n'
'1-0:4.7.0(000000166*var)\r\n'
'!7EF9\r\n'
)
107 changes: 107 additions & 0 deletions test/test_parse_sagemcom_t210_d_r.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from binascii import unhexlify
from copy import deepcopy

import unittest

from dlms_cosem.exceptions import DecryptionError
from dlms_cosem.protocol.xdlms import GeneralGlobalCipher
from dlms_cosem.security import SecurityControlField, encrypt

from dsmr_parser import telegram_specifications
from dsmr_parser.exceptions import ParseError
from dsmr_parser.parsers import TelegramParser
from test.example_telegrams import TELEGRAM_SAGEMCOM_T210_D_R


class TelegramParserEncryptedTest(unittest.TestCase):
""" Test parsing of a DSML encypted DSMR v5.x telegram. """
DUMMY_ENCRYPTION_KEY = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
DUMMY_AUTHENTICATION_KEY = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"

def __generate_encrypted(self, security_suite=0, authenticated=True, encrypted=True):
security_control = SecurityControlField(
security_suite=security_suite, authenticated=authenticated, encrypted=encrypted
)
encryption_key = unhexlify(self.DUMMY_ENCRYPTION_KEY)
authentication_key = unhexlify(self.DUMMY_AUTHENTICATION_KEY)
system_title = "SYSTEMID".encode("ascii")
invocation_counter = int.from_bytes(bytes.fromhex("10000001"), "big")
plain_data = TELEGRAM_SAGEMCOM_T210_D_R.encode("ascii")

encrypted = encrypt(
security_control=security_control,
key=encryption_key,
auth_key=authentication_key,
system_title=system_title,
invocation_counter=invocation_counter,
plain_text=plain_data,
)

full_frame = bytearray(GeneralGlobalCipher.TAG.to_bytes(1, "big", signed=False))
full_frame.extend(len(system_title).to_bytes(1, "big", signed=False))
full_frame.extend(system_title)
full_frame.extend([0x82]) # Length of the following length bytes
# https://github.com/pwitab/dlms-cosem/blob/739f81a58e5f07663a512d4a128851333a0ed5e6/dlms_cosem/a_xdr.py#L33

security_control = security_control.to_bytes()
invocation_counter = invocation_counter.to_bytes(4, "big", signed=False)
full_frame.extend((len(encrypted)
+ len(invocation_counter)
+ len(security_control)).to_bytes(2, "big", signed=False))
full_frame.extend(security_control)
full_frame.extend(invocation_counter)
full_frame.extend(encrypted)

return full_frame

def test_parse(self):
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
result = parser.parse(self.__generate_encrypted().hex(),
self.DUMMY_ENCRYPTION_KEY,
self.DUMMY_AUTHENTICATION_KEY)
self.assertEqual(len(result), 18)

def test_damaged_frame(self):
# If the frame is damaged decrypting fails (crc is technically not needed)
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)

generated = self.__generate_encrypted()
generated[150] = 0x00
generated = generated.hex()

with self.assertRaises(DecryptionError):
parser.parse(generated, self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY)

def test_plain(self):
# If a plain request is parsed with "general_global_cipher": True it fails
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)

with self.assertRaises(Exception):
parser.parse(TELEGRAM_SAGEMCOM_T210_D_R, self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY)

def test_general_global_cipher_not_specified(self):
# If a GGC frame is detected but general_global_cipher is not set it fails
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)
parser = deepcopy(parser) # We do not want to change the module value
parser.telegram_specification['general_global_cipher'] = False

with self.assertRaises(ParseError):
parser.parse(self.__generate_encrypted().hex(), self.DUMMY_ENCRYPTION_KEY, self.DUMMY_AUTHENTICATION_KEY)

def test_only_encrypted(self):
# Not implemented by dlms_cosem
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)

only_auth = self.__generate_encrypted(0, authenticated=False, encrypted=True).hex()

with self.assertRaises(ValueError):
parser.parse(only_auth, self.DUMMY_ENCRYPTION_KEY)

def test_only_auth(self):
# Not implemented by dlms_cosem
parser = TelegramParser(telegram_specifications.SAGEMCOM_T210_D_R)

only_auth = self.__generate_encrypted(0, authenticated=True, encrypted=False).hex()

with self.assertRaises(ValueError):
parser.parse(only_auth, authentication_key=self.DUMMY_AUTHENTICATION_KEY)
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ deps=
pylama
pytest-asyncio
pytest-mock
dlms_cosem
commands=
py.test --cov=dsmr_parser test {posargs}
pylama dsmr_parser test
Expand Down