Skip to content

Commit

Permalink
typing
Browse files Browse the repository at this point in the history
  • Loading branch information
gregjhogan committed Oct 15, 2019
1 parent 768fdf7 commit b64d6fa
Showing 1 changed file with 35 additions and 28 deletions.
63 changes: 35 additions & 28 deletions python/uds.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
import time
import struct
from typing import NamedTuple, List
from enum import IntEnum
from queue import Queue, Empty
from threading import Thread
Expand Down Expand Up @@ -141,6 +142,12 @@ class DYNAMIC_DEFINITION_TYPE(IntEnum):
DEFINE_BY_MEMORY_ADDRESS = 2
CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3

class DynamicSourceDefinition(NamedTuple):
data_identifier: int
position: int
memory_size: int
memory_address: int

class DTC_GROUP_TYPE(IntEnum):
EMISSIONS = 0x000000
ALL = 0xFFFFFF
Expand Down Expand Up @@ -185,7 +192,7 @@ class DTC_SEVERITY_MASK_TYPE(IntEnum):
CHECK_IMMEDIATELY = 0x80
ALL = 0xE0

class CONTROL_OPTION_TYPE(IntEnum):
class CONTROL_PARAMETER_TYPE(IntEnum):
RETURN_CONTROL_TO_ECU = 0
RESET_TO_DEFAULT = 1
FREEZE_CURRENT_STATE = 2
Expand Down Expand Up @@ -261,7 +268,7 @@ class InvalidSubFunctioneError(Exception):
}

class UdsClient():
def __init__(self, panda, tx_addr, rx_addr=None, bus=0, timeout=10, debug=False):
def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: int=10, debug: bool=False):
self.panda = panda
self.bus = bus
self.tx_addr = tx_addr
Expand All @@ -284,7 +291,7 @@ def __init__(self, panda, tx_addr, rx_addr=None, bus=0, timeout=10, debug=False)
self.can_reader_t.daemon = True
self.can_reader_t.start()

def _isotp_thread(self, debug):
def _isotp_thread(self, debug: bool=False):
try:
rx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
tx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
Expand Down Expand Up @@ -389,7 +396,7 @@ def _isotp_thread(self, debug):
self.rx_queue.put(None)

# generic uds request
def _uds_request(self, service_type, subfunction=None, data=None):
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes:
req = bytes([service_type])
if subfunction is not None:
req += bytes([subfunction])
Expand Down Expand Up @@ -441,17 +448,17 @@ def _uds_request(self, service_type, subfunction=None, data=None):
return resp[(1 if subfunction is None else 2):]

# services
def diagnostic_session_control(self, session_type):
def diagnostic_session_control(self, session_type: SESSION_TYPE):
self._uds_request(SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type)

def ecu_reset(self, reset_type):
def ecu_reset(self, reset_type: RESET_TYPE):
resp = self._uds_request(SERVICE_TYPE.ECU_RESET, subfunction=reset_type)
power_down_time = None
if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN:
power_down_time = resp[0]
return power_down_time

def security_access(self, access_type, security_key=None):
def security_access(self, access_type: ACCESS_TYPE, security_key: bytes=None):
request_seed = access_type % 2 != 0
if request_seed and security_key is not None:
raise ValueError('security_key not allowed')
Expand All @@ -462,14 +469,14 @@ def security_access(self, access_type, security_key=None):
security_seed = resp
return security_seed

def communication_control(self, control_type, message_type):
def communication_control(self, control_type: CONTROL_TYPE, message_type: MESSAGE_TYPE):
data = bytes([message_type])
self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)

def tester_present(self, ):
self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)

def access_timing_parameter(self, timing_parameter_type, parameter_values):
def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: bytes=None):
write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES
read_values = (
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or
Expand All @@ -485,16 +492,16 @@ def access_timing_parameter(self, timing_parameter_type, parameter_values):
parameter_values = resp
return parameter_values

def secured_data_transmission(self, data):
def secured_data_transmission(self, data: bytes):
# TODO: split data into multiple input parameters?
resp = self._uds_request(SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data)
# TODO: parse response into multiple output values?
return resp

def control_dtc_setting(self, dtc_setting_type):
def control_dtc_setting(self, dtc_setting_type: DTC_SETTING_TYPE):
self._uds_request(SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type)

def response_on_event(self, response_event_type, store_event, window_time, event_type_record, service_response_record):
def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, event_type_record: int, service_response_record: int):
if store_event:
response_event_type |= 0x20
# TODO: split record parameters into arrays
Expand All @@ -513,7 +520,7 @@ def response_on_event(self, response_event_type, store_event, window_time, event
"data": resp[2:], # TODO: parse the reset of response
}

def link_control(self, link_control_type, baud_rate_type=None):
def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE=None):
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
# baud_rate_type = BAUD_RATE_TYPE
data = bytes([baud_rate_type])
Expand All @@ -524,7 +531,7 @@ def link_control(self, link_control_type, baud_rate_type=None):
data = None
self._uds_request(SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data)

def read_data_by_identifier(self, data_identifier_type):
def read_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE):
# TODO: support list of identifiers
data = struct.pack('!H', data_identifier_type)
resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
Expand All @@ -533,7 +540,7 @@ def read_data_by_identifier(self, data_identifier_type):
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]

def read_memory_by_address(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=1):
def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
Expand All @@ -550,20 +557,20 @@ def read_memory_by_address(self, memory_address, memory_size, memory_address_byt
resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
return resp

def read_scaling_data_by_identifier(self, data_identifier_type):
def read_scaling_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE):
data = struct.pack('!H', data_identifier_type)
resp = self._uds_request(SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:] # TODO: parse the response

def read_data_by_periodic_identifier(self, transmission_mode_type, periodic_data_identifier):
def read_data_by_periodic_identifier(self, transmission_mode_type: TRANSMISSION_MODE_TYPE, periodic_data_identifier: int):
# TODO: support list of identifiers
data = bytes([transmission_mode_type, periodic_data_identifier])
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)

def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_data_identifier, source_definitions, memory_address_bytes=4, memory_size_bytes=1):
def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int=4, memory_size_bytes: int=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
Expand All @@ -588,14 +595,14 @@ def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_da
raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type)))
self._uds_request(SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)

def write_data_by_identifier(self, data_identifier_type, data_record):
def write_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, data_record: bytes):
data = struct.pack('!H', data_identifier_type) + data_record
resp = self._uds_request(SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))

def write_memory_by_address(self, memory_address, memory_size, data_record, memory_address_bytes=4, memory_size_bytes=1):
def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int=4, memory_size_bytes: int=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
Expand All @@ -612,11 +619,11 @@ def write_memory_by_address(self, memory_address, memory_size, data_record, memo
data += data_record
self._uds_request(SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data)

def clear_diagnostic_information(self, dtc_group_type):
def clear_diagnostic_information(self, dtc_group_type: DTC_GROUP_TYPE):
data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes
self._uds_request(SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data)

def read_dtc_information(self, dtc_report_type, dtc_status_mask_type=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record=0xFFFFFF, dtc_snapshot_record_num=0xFF, dtc_extended_record_num=0xFF):
def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int=0xFFFFFF, dtc_snapshot_record_num: int=0xFF, dtc_extended_record_num: int=0xFF):
data = b''
# dtc_status_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \
Expand Down Expand Up @@ -652,23 +659,23 @@ def read_dtc_information(self, dtc_report_type, dtc_status_mask_type=DTC_STATUS_
# TODO: parse response
return resp

def input_output_control_by_identifier(self, data_identifier_type, control_option_record, control_enable_mask_record=b''):
data = struct.pack('!H', data_identifier_type) + control_option_record + control_enable_mask_record
def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE, control_option_record: bytes, control_enable_mask_record: bytes=b''):
data = struct.pack('!H', data_identifier_type) + bytes([control_parameter_type]) + control_option_record + control_enable_mask_record
resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]

def routine_control(self, routine_control_type, routine_identifier_type, routine_option_record=b''):
def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes=b''):
data = struct.pack('!H', routine_identifier_type) + routine_option_record
resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != routine_identifier_type:
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id)))
return resp[2:]

def request_download(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00):
data = bytes([data_format])

if memory_address_bytes < 1 or memory_address_bytes > 4:
Expand All @@ -693,7 +700,7 @@ def request_download(self, memory_address, memory_size, memory_address_bytes=4,

return max_num_bytes # max number of bytes per transfer data request

def request_upload(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
def request_upload(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00):
data = bytes([data_format])

if memory_address_bytes < 1 or memory_address_bytes > 4:
Expand All @@ -718,7 +725,7 @@ def request_upload(self, memory_address, memory_size, memory_address_bytes=4, me

return max_num_bytes # max number of bytes per transfer data request

def transfer_data(self, block_sequence_count, data=b''):
def transfer_data(self, block_sequence_count: int, data: bytes=b''):
data = bytes([block_sequence_count]) + data
resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = resp[0] if len(resp) > 0 else None
Expand Down

0 comments on commit b64d6fa

Please sign in to comment.