forked from OceanDataTools/openrvdas
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnmea_checksum_transform.py
123 lines (92 loc) · 3.91 KB
/
nmea_checksum_transform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import logging
# For efficient checksum code
from functools import reduce
from operator import xor
############################
def get_message_str(source):
""" Returns message_str, which is everything between the '$' and '*' in the source string """
if ((source.find('$') == -1) or (source.find('*') == -1)):
return None
start = source.index('$')+1
end = source.index('*')
message_str = source[start:end]
return message_str
def get_checksum_value(source):
""" Returns checksum_value, which is the parsed checksum value (after '*')
from the source string.
"""
if (source.find('*') == -1):
return None
start = source.index('*')+1
checksum_value = source[start:]
return checksum_value
def compute_checksum(source):
"""Return hex checksum for source string."""
return '%02X' % reduce(xor, (ord(c) for c in source))
################################################################################
class NMEAChecksumTransform:
"""
NMEAChecksumTransform checks the integrity/completeness of a record by confirming
whether or not the checksum matches. If the checksum matches, it returns the record,
and otherwise it sends an error message.
"""
DEFAULT_ERROR_MESSAGE = 'Bad checksum for record: '
def __init__(self, checksum_optional=False, error_message=DEFAULT_ERROR_MESSAGE, writer=None):
"""
checksum_optional — If True, then pass record along even if checksum is missing
error_message — Optional custom error message; if None then use DEFAULT_ERROR_MESSAGE
writer — Optional error writer; if None, log to stderr
"""
self.checksum_optional = checksum_optional
self.error_message = error_message
self.writer = writer
# Tries to utilize the write() method of writer, which it
# would only have if the object is a Writer. Send it a 'None'
# record, which all writers should ignore. If this fails,
# writer is set to None.
if writer:
try:
writer.write(None)
except AttributeError:
logging.error('Writer passed to NMEAChecksumTransform has no '
'write() method!')
self.writer = None
def transform(self, record):
"""
Checks if computed checksum matches parsed checksum. If True, it returns the record,
otherwise it calls send_error_message().
record - the record in question
"""
if not record:
return None
if not type(record) is str:
logging.warning('NMEAChecksumTransform passed non-string record '
'(type %s): %s', type(record), record)
return None
checksum_value = get_checksum_value(record)
if checksum_value is None:
if self.checksum_optional:
return record
# If here, checksum is not optional and does not exist
self.send_error_message(record, 'No checksum found in record ')
return None
message_str = get_message_str(record)
computed_checksum = compute_checksum(message_str)
# If here, and we are about to see if it matches
if computed_checksum == checksum_value:
return record
# If here, then checksum exists but didn't match
self.send_error_message(record)
return None
def send_error_message(self, record, message=None):
"""
Send error to writer if one exists, otherwise send it to stderr
record - the record with the error
message - optional custom message. If None, then use self.error_message
"""
error_message = message or self.error_message
error_message += record
if self.writer:
self.writer.write(error_message)
else:
logging.warning(error_message)