diff --git a/adafruit_atecc/adafruit_atecc.py b/adafruit_atecc/adafruit_atecc.py index 32934cb..18009d8 100755 --- a/adafruit_atecc/adafruit_atecc.py +++ b/adafruit_atecc/adafruit_atecc.py @@ -45,6 +45,16 @@ """ import time from struct import pack + +# Since the board may or may not have access to the typing library we need +# to have this in a try/except to enable type hinting for the IDEs while +# not breaking the runtime on the controller. +try: + from typing import Any, Sized, Optional + from busio import I2C +except ImportError: + pass + from micropython import const from adafruit_bus_device.i2c_device import I2CDevice from adafruit_binascii import hexlify, unhexlify @@ -154,12 +164,15 @@ class ATECC: CircuitPython interface for ATECCx08A Crypto Co-Processor Devices. """ - def __init__(self, i2c_bus, address=_REG_ATECC_DEVICE_ADDR, debug=False): - """Initializes an ATECC device. + def __init__( + self, i2c_bus: I2C, address: int = _REG_ATECC_DEVICE_ADDR, debug: bool = False + ): + """ + Initializes an ATECC device. + :param busio i2c_bus: I2C Bus object. :param int address: Device address, defaults to _ATECC_DEVICE_ADDR. :param bool debug: Library debugging enabled - """ self._debug = debug self._i2cbuf = bytearray(12) @@ -248,7 +261,7 @@ def lock_all_zones(self): self.lock(0) self.lock(1) - def lock(self, zone): + def lock(self, zone: int): """Locks specific ATECC zones. :param int zone: ATECC zone to lock. """ @@ -260,10 +273,13 @@ def lock(self, zone): assert res[0] == 0x00, "Failed locking ATECC!" self.idle() - def info(self, mode, param=None): - """Returns device state information - :param int mode: Mode encoding, see Table 9-26. + def info(self, mode: int, param: Optional[Any] = None) -> bytearray: + """ + Returns device state information + :param int mode: Mode encoding, see Table 9-26. + :param param: Optional parameter + :return: bytearray containing the response """ self.wakeup() if not param: @@ -276,13 +292,15 @@ def info(self, mode, param=None): self.idle() return info_out - def nonce(self, data, mode=0, zero=0x0000): - """Generates a nonce by combining internally generated random number + def nonce(self, data: bytearray, mode: int = 0, zero: int = 0x0000) -> bytearray: + """ + Generates a nonce by combining internally generated random number with an input value. + :param bytearray data: Input value from system or external. :param int mode: Controls the internal RNG and seed mechanism. :param int zero: Param2, see Table 9-35. - + :return: bytearray containing the calculated nonce """ self.wakeup() if mode in (0x00, 0x01): @@ -309,13 +327,15 @@ def nonce(self, data, mode=0, zero=0x0000): self.idle() return calculated_nonce - def counter(self, counter=0, increment_counter=True): - """Reads the binary count value from one of the two monotonic + def counter(self, counter: int = 0, increment_counter: bool = True) -> bytearray: + """ + Reads the binary count value from one of the two monotonic counters located on the device within the configuration zone. The maximum value that the counter may have is 2,097,151. + :param int counter: Device's counter to increment. :param bool increment_counter: Increments the value of the counter specified. - + :return: bytearray with the count """ counter = 0x00 self.wakeup() @@ -331,18 +351,20 @@ def counter(self, counter=0, increment_counter=True): self.idle() return count - def random(self, rnd_min=0, rnd_max=0): - """Generates a random number for use by the system. + def random(self, rnd_min: int = 0, rnd_max: int = 0) -> int: + """ + Generates a random number for use by the system. + :param int rnd_min: Minimum Random value to generate. :param int rnd_max: Maximum random value to generate. - + :return: Random integer """ if rnd_max: rnd_min = 0 if rnd_min >= rnd_max: return rnd_min delta = rnd_max - rnd_min - r = bytes(16) + r = bytearray(16) r = self._random(r) data = 0 for i in enumerate(r): @@ -352,10 +374,12 @@ def random(self, rnd_min=0, rnd_max=0): data = data % delta return data + rnd_min - def _random(self, data): - """Initializes the random number generator and returns. - :param bytearray data: Response buffer. + def _random(self, data: bytearray) -> bytearray: + """ + Initializes the random number generator and returns. + :param bytearray data: Response buffer. + :return: bytearray """ self.wakeup() data_len = len(data) @@ -371,8 +395,9 @@ def _random(self, data): return data # SHA-256 Commands - def sha_start(self): - """Initializes the SHA-256 calculation engine + def sha_start(self) -> bytearray: + """ + Initializes the SHA-256 calculation engine and the SHA context in memory. This method MUST be called before sha_update or sha_digest """ @@ -385,11 +410,13 @@ def sha_start(self): self.idle() return status - def sha_update(self, message): - """Appends bytes to the message. Can be repeatedly called. + def sha_update(self, message: bytes) -> bytearray: + """ + Appends bytes to the message. Can be repeatedly called. + :param bytes message: Up to 64 bytes of data to be included into the hash operation. - + :return: bytearray containing the status """ self.wakeup() self._send_command(OP_SHA, 0x01, 64, message) @@ -400,12 +427,14 @@ def sha_update(self, message): self.idle() return status - def sha_digest(self, message=None): - """Returns the digest of the data passed to the + def sha_digest(self, message: bytearray = None) -> bytearray: + """ + Returns the digest of the data passed to the sha_update method so far. + :param bytearray message: Up to 64 bytes of data to be included into the hash operation. - + :return: bytearray containing the digest """ if not hasattr(message, "append") and message is not None: message = pack("B", message) @@ -422,11 +451,16 @@ def sha_digest(self, message=None): self.idle() return digest - def gen_key(self, key, slot_num, private_key=False): - """Generates a private or public key. + def gen_key( + self, key: bytearray, slot_num: int, private_key: bool = False + ) -> bytearray: + """ + Generates a private or public key. + + :param key: Buffer to put the key into :param int slot_num: ECC slot (from 0 to 4). :param bool private_key: Generates a private key if true. - + :return: The requested key """ assert 0 <= slot_num <= 4, "Provided slot must be between 0 and 4." self.wakeup() @@ -440,11 +474,13 @@ def gen_key(self, key, slot_num, private_key=False): self.idle() return key - def ecdsa_sign(self, slot, message): - """Generates and returns a signature using the ECDSA algorithm. + def ecdsa_sign(self, slot: int, message: bytearray) -> bytearray: + """ + Generates and returns a signature using the ECDSA algorithm. + :param int slot: Which ECC slot to use. :param bytearray message: Message to be signed. - + :return: bytearray containing the signature """ # Load the message digest into TempKey using Nonce (9.1.8) self.nonce(message, 0x03) @@ -453,9 +489,12 @@ def ecdsa_sign(self, slot, message): sig = self.sign(slot) return sig - def sign(self, slot_id): - """Performs ECDSA signature calculation with key in provided slot. + def sign(self, slot_id: int) -> bytearray: + """ + Performs ECDSA signature calculation with key in provided slot. + :param int slot_id: ECC slot containing key for use with signature. + :return: bytearray containing the signature """ self.wakeup() self._send_command(0x41, 0x80, slot_id) @@ -465,8 +504,10 @@ def sign(self, slot_id): self.idle() return signature - def write_config(self, data): - """Writes configuration data to the device's EEPROM. + def write_config(self, data: bytearray): + """ + Writes configuration data to the device's EEPROM. + :param bytearray data: Configuration data to-write """ # First 16 bytes of data are skipped, not writable @@ -476,7 +517,14 @@ def write_config(self, data): continue self._write(0, i // 4, data[i : i + 4]) - def _write(self, zone, address, buffer): + def _write(self, zone: Any, address: int, buffer: bytearray): + """ + Writes to the I2C + + :param Any zone: Zone to send to + :param int address: The address to send to + :param bytearray buffer: The buffer to send + """ self.wakeup() if len(buffer) not in (4, 32): raise RuntimeError("Only 4 or 32-byte writes supported.") @@ -488,7 +536,14 @@ def _write(self, zone, address, buffer): self._get_response(status) self.idle() - def _read(self, zone, address, buffer): + def _read(self, zone: int, address: int, buffer: bytearray): + """ + Reads from the I2C + + :param int zone: Zone to read from + :param int address: The address to read from + :param bytearray buffer: The buffer to read to + """ self.wakeup() if len(buffer) not in (4, 32): raise RuntimeError("Only 4 and 32 byte reads supported") @@ -500,8 +555,12 @@ def _read(self, zone, address, buffer): time.sleep(0.001) self.idle() - def _send_command(self, opcode, param_1, param_2=0x00, data=""): - """Sends a security command packet over i2c. + def _send_command( + self, opcode: int, param_1: int, param_2: int = 0x00, data: Sized = "" + ): + """ + Sends a security command packet over i2c. + :param byte opcode: The command Opcode :param byte param_1: The first parameter :param byte param_2: The second parameter, can be two bytes. @@ -534,7 +593,7 @@ def _send_command(self, opcode, param_1, param_2=0x00, data=""): # small sleep time.sleep(0.001) - def _get_response(self, buf, length=None, retries=20): + def _get_response(self, buf: Sized, length: int = None, retries: int = 20) -> int: self.wakeup() if length is None: length = len(buf) @@ -559,7 +618,7 @@ def _get_response(self, buf, length=None, retries=20): return response[1] @staticmethod - def _at_crc(data, length=None): + def _at_crc(data: Sized, length: int = None) -> int: if length is None: length = len(data) if not data or not length: diff --git a/adafruit_atecc/adafruit_atecc_asn1.py b/adafruit_atecc/adafruit_atecc_asn1.py index 834c75b..2c6e352 100755 --- a/adafruit_atecc/adafruit_atecc_asn1.py +++ b/adafruit_atecc/adafruit_atecc_asn1.py @@ -37,9 +37,16 @@ """ import struct + # pylint: disable=invalid-name -def get_signature(signature, data): - """Appends signature data to buffer.""" +def get_signature(signature: bytearray, data: bytearray) -> int: + """ + Appends signature data to buffer. + + :param bytearray signature: The signature to append + :param bytearray data: The buffer to append the signature to + :return: Updated length of the buffer + """ # Signature algorithm data += b"\x30\x0a\x06\x08" # ECDSA with SHA256 @@ -91,8 +98,26 @@ def get_signature(signature, data): # pylint: disable=too-many-arguments -def get_issuer_or_subject(data, country, state_prov, locality, org, org_unit, common): - """Appends issuer or subject, if they exist, to data.""" +def get_issuer_or_subject( + data: bytearray, + country: str, + state_prov: str, + locality: str, + org: str, + org_unit: str, + common: str, +): + """ + Appends issuer or subject, if they exist, to data. + + :param bytearray data: buffer to append to + :param str country: The country to append to the buffer + :param str state_prov: The state/province to append to the buffer + :param str locality: The locality to append to the buffer + :param str org: The organization to append to the buffer + :param str org_unit: The organizational unit to append to the buffer + :param str common: The common data to append to the buffer + """ if country: get_name(country, 0x06, data) if state_prov: @@ -107,11 +132,14 @@ def get_issuer_or_subject(data, country, state_prov, locality, org, org_unit, co get_name(common, 0x03, data) -def get_name(name, obj_type, data): - """Appends ASN.1 string in form: set -> seq -> objid -> string +def get_name(name: str, obj_type: int, data: bytearray) -> int: + """ + Appends ASN.1 string in form: set -> seq -> objid -> string + :param str name: String to append to buffer. :param int obj_type: Object identifier type. :param bytearray data: Buffer to write to. + :return: Length of the updated buffer """ # ASN.1 SET data += b"\x31" + struct.pack("B", len(name) + 9) @@ -126,15 +154,24 @@ def get_name(name, obj_type, data): return len(name) + 11 -def get_version(data): - """Appends X.509 version to data.""" +def get_version(data: bytearray) -> None: + """ + Appends X.509 version to data. + + :param bytearray data: Buffer to append the version to + """ # If no extensions are present, but a UniqueIdentifier # is present, the version SHOULD be 2 (value is 1) [4-1-2] data += b"\x02\x01\x00" -def get_sequence_header(length, data): - """Appends sequence header to provided data.""" +def get_sequence_header(length: int, data: bytearray) -> None: + """ + Appends sequence header to provided data. + + :param int length: Length of the buffer + :param bytearray data: The buffer + """ data += b"\x30" if length > 255: data += b"\x82" @@ -145,8 +182,13 @@ def get_sequence_header(length, data): data += length_byte -def get_public_key(data, public_key): - """Appends public key subject and object identifiers.""" +def get_public_key(data: bytearray, public_key: bytearray) -> None: + """ + Appends public key subject and object identifiers. + + :param bytearray data: buffer + :param bytearray public_key: Public key to append + """ # Subject: Public Key data += b"\x30" + struct.pack("B", (0x59) & 0xFF) + b"\x30\x13" # Object identifier: EC Public Key @@ -157,9 +199,12 @@ def get_public_key(data, public_key): data += public_key -def get_signature_length(signature): - """Return length of ECDSA signature. +def get_signature_length(signature: bytearray) -> int: + """ + Return length of ECDSA signature. + :param bytearray signature: Signed SHA256 hash. + :return: length of ECDSA signature. """ r = signature[0] s = signature[32] @@ -182,8 +227,13 @@ def get_signature_length(signature): return 21 + r_len + s_len -def get_sequence_header_length(seq_header_len): - """Returns length of SEQUENCE header.""" +def get_sequence_header_length(seq_header_len: int) -> int: + """ + Returns length of SEQUENCE header. + + :param int seq_header_len: Sequence header length + :return: Length of the sequence header + """ if seq_header_len > 255: return 4 if seq_header_len > 127: @@ -191,8 +241,21 @@ def get_sequence_header_length(seq_header_len): return 2 -def issuer_or_subject_length(country, state_prov, city, org, org_unit, common): - """Returns total length of provided certificate information.""" +def issuer_or_subject_length( + country: str, state_prov: str, city: str, org: str, org_unit: str, common: str +) -> int: + """ + Returns total length of provided certificate information. + + :param str country: Country of certificate + :param str state_prov: State/province of certificate + :param str city: City of certificate + :param str org: Organization of certificate + :param str org_unit: Organization unit of certificate + :param str common: Common data of certificate + :raises: TypeError if return value is 0 + :return: Total length of provided certificate information. + """ tot_len = 0 if country: tot_len += 11 + len(country) diff --git a/adafruit_atecc/adafruit_atecc_cert_util.py b/adafruit_atecc/adafruit_atecc_cert_util.py index da0d2e3..fcfc8c2 100755 --- a/adafruit_atecc/adafruit_atecc_cert_util.py +++ b/adafruit_atecc/adafruit_atecc_cert_util.py @@ -38,6 +38,7 @@ """ from adafruit_binascii import b2a_base64 import adafruit_atecc.adafruit_atecc_asn1 as asn1 +from adafruit_atecc.adafruit_atecc import ATECC class CSR: @@ -45,7 +46,8 @@ class CSR: :param adafruit_atecc atecc: ATECC module. :param slot_num: ATECC module slot (from 0 to 4). - :param bool private_key: Generate a new private key in selected slot? + :param bool private_key: Generate a new private + key in selected slot? :param str country: 2-letter country code. :param str state_prov: State or Province name, :param str city: City name. @@ -56,7 +58,15 @@ class CSR: # pylint: disable=too-many-arguments, too-many-instance-attributes def __init__( - self, atecc, slot_num, private_key, country, state_prov, city, org, org_unit + self, + atecc: ATECC, + slot_num: int, + private_key: bool, + country: str, + state_prov: str, + city: str, + org: str, + org_unit: str, ): self._atecc = atecc self.private_key = private_key @@ -71,13 +81,13 @@ def __init__( self._cert = None self._key = None - def generate_csr(self): + def generate_csr(self) -> bytearray: """Generates and returns a certificate signing request.""" self._csr_begin() csr = self._csr_end() return csr - def _csr_begin(self): + def _csr_begin(self) -> None: """Initializes CSR generation.""" assert 0 <= self._slot <= 4, "Provided slot must be between 0 and 4." # Create a new key @@ -87,7 +97,7 @@ def _csr_begin(self): return self._atecc.gen_key(self._key, self._slot, self.private_key) - def _csr_end(self): + def _csr_end(self) -> bytearray: """Generates and returns a certificate signing request as a base64 string.""" len_issuer_subject = asn1.issuer_or_subject_length(