From dd4f2f15e00a38b3ae45354e40163895528a4542 Mon Sep 17 00:00:00 2001 From: Andreas Wallner Date: Sat, 20 Jul 2024 21:30:04 +0200 Subject: [PATCH 1/2] Fix broken f-strings --- pyftdi/eeprom.py | 2 +- pyftdi/ftdi.py | 2 +- pyftdi/tests/backend/ftdivirt.py | 2 +- pyftdi/tests/backend/mpsse.py | 2 +- pyftdi/tests/backend/usbvirt.py | 2 +- pyftdi/usbtools.py | 4 ++-- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pyftdi/eeprom.py b/pyftdi/eeprom.py index 7e692504..148d972f 100644 --- a/pyftdi/eeprom.py +++ b/pyftdi/eeprom.py @@ -478,7 +478,7 @@ def set_property(self, name: str, value: Union[str, int, bool], mobj = match(r'cbus_func_(\d)', name) if mobj: if not isinstance(value, str): - raise ValueError("'{name}' should be specified as a string") + raise ValueError(f"'{name}' should be specified as a string") self._set_cbus_func(int(mobj.group(1)), value, out) self._dirty.add(name) return diff --git a/pyftdi/ftdi.py b/pyftdi/ftdi.py index 26e1da6a..ad8bd7d6 100644 --- a/pyftdi/ftdi.py +++ b/pyftdi/ftdi.py @@ -2267,7 +2267,7 @@ def _set_baudrate(self, baudrate: int, constrain: bool) -> int: raise FtdiError('Unable to set baudrate') return actual except USBError as exc: - raise FtdiError('UsbError: {exc}') from exc + raise FtdiError(f'UsbError: {exc}') from exc def _set_frequency(self, frequency: float) -> float: """Convert a frequency value into a TCK divisor setting""" diff --git a/pyftdi/tests/backend/ftdivirt.py b/pyftdi/tests/backend/ftdivirt.py index b188f6ac..35358660 100644 --- a/pyftdi/tests/backend/ftdivirt.py +++ b/pyftdi/tests/backend/ftdivirt.py @@ -1107,7 +1107,7 @@ def _build_eeprom(cls, version, eeprom: Optional[dict]) -> bytearray: try: size = cls.EXT_EEPROMS[model.lower()] except KeyError as exc: - raise ValueError('Unsupported EEPROM model: {model}') \ + raise ValueError(f'Unsupported EEPROM model: {model}') \ from exc data = eeprom.get('data', b'') if version in cls.INT_EEPROMS: diff --git a/pyftdi/tests/backend/mpsse.py b/pyftdi/tests/backend/mpsse.py index 38ea142b..d6bffe28 100644 --- a/pyftdi/tests/backend/mpsse.py +++ b/pyftdi/tests/backend/mpsse.py @@ -20,7 +20,7 @@ class VirtMpsseTracer(FtdiMpsseTracer): def __init__(self, port: 'VirtFtdiPort', version: int): super().__init__(version) - self.log = getLogger('pyftdi.virt.mpsse.{port.iface}') + self.log = getLogger(f'pyftdi.virt.mpsse.{port.iface}') self._port = port def _get_engine(self, iface: int): diff --git a/pyftdi/tests/backend/usbvirt.py b/pyftdi/tests/backend/usbvirt.py index 319087df..a99e671d 100644 --- a/pyftdi/tests/backend/usbvirt.py +++ b/pyftdi/tests/backend/usbvirt.py @@ -353,7 +353,7 @@ def get_virtual_ftdi(self, bus: int, address: int) -> VirtFtdi: for dev in self._devices: if dev.bus == bus and dev.address == address: return dev.ftdi - raise ValueError('No FTDI @ {bus:address}') + raise ValueError(f'No FTDI @ {bus:address}') def enumerate_devices(self) -> VirtDevice: yield from self._devices diff --git a/pyftdi/usbtools.py b/pyftdi/usbtools.py index b8a593fd..4af310a3 100644 --- a/pyftdi/usbtools.py +++ b/pyftdi/usbtools.py @@ -319,7 +319,7 @@ def parse_url(cls, urlstr: str, scheme: str, if len(cvendors) == 1: vendor = cvendors.pop() if vendor not in pdict: - vstr = '0x{vendor:04x}' if vendor is not None else '?' + vstr = f'0x{vendor:04x}' if vendor is not None else '?' raise UsbToolsError(f'Vendor ID {vstr} not supported') if not product: cproducts = {candidate[1] for candidate in candidates @@ -327,7 +327,7 @@ def parse_url(cls, urlstr: str, scheme: str, if len(cproducts) == 1: product = cproducts.pop() if product not in pdict[vendor].values(): - pstr = '0x{vendor:04x}' if product is not None else '?' + pstr = f'0x{vendor:04x}' if product is not None else '?' raise UsbToolsError(f'Product ID {pstr} not supported') devdesc = UsbDeviceDescriptor(vendor, product, desc.bus, desc.address, desc.sn, idx, desc.description) From 1c50cee2915183fb57eaa8f8cd82117a6cbbea07 Mon Sep 17 00:00:00 2001 From: Andreas Wallner Date: Sat, 20 Jul 2024 21:51:19 +0200 Subject: [PATCH 2/2] Add SPI CS active high support --- pyftdi/spi.py | 34 +++++++++++++++++++++++--------- pyftdi/tests/spi.py | 47 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 71 insertions(+), 10 deletions(-) diff --git a/pyftdi/spi.py b/pyftdi/spi.py index e7c98821..ef58798f 100644 --- a/pyftdi/spi.py +++ b/pyftdi/spi.py @@ -152,11 +152,8 @@ def set_mode(self, mode: int, cs_hold: Optional[int] = None) -> None: self._cs_hold = cs_hold self._cpol = bool(mode & 0x2) self._cpha = bool(mode & 0x1) - cs_clock = 0xFF & ~((int(not self._cpol) and SpiController.SCK_BIT) | - SpiController.DO_BIT) - cs_select = 0xFF & ~((SpiController.CS_BIT << self._cs) | - (int(not self._cpol) and SpiController.SCK_BIT) | - SpiController.DO_BIT) + cs_clock = self._controller._cs_idle | (int(self._cpol) and SpiController.SCK_BIT) + cs_select = cs_clock ^ (SpiController.CS_BIT << self._cs) self._cs_prolog = bytes([cs_clock, cs_select]) self._cs_epilog = bytes([cs_select] + [cs_clock] * int(cs_hold)) @@ -210,6 +207,14 @@ def cs(self) -> int: :return: the /CS index (starting from 0) """ return self._cs + + @property + def cs_active_high(self) -> bool: + """Is the CS active high? + + :return: True if it's active high, False if active low + """ + (self._controller._cs_idle & (SpiController.CS_BIT << self._cs)) == 0 @property def mode(self) -> int: @@ -352,6 +357,7 @@ def __init__(self, cs_count: int = 1, turbo: bool = True): self._spi_ports = [] self._spi_dir = 0 self._spi_mask = self.SPI_BITS + self._cs_idle = 0xff def configure(self, url: Union[str, UsbDevice], **kwargs: Mapping[str, Any]) -> None: @@ -378,6 +384,8 @@ def configure(self, url: Union[str, UsbDevice], frequency. * ``cs_count`` count of chip select signals dedicated to select SPI slave devices, starting from A*BUS3 pin + * ``active_high_cs`` iterable of CS slots that should be active high + instead of active low * ``turbo`` whether to enable or disable turbo mode * ``debug`` to increase log verbosity, using MPSSE tracer """ @@ -389,6 +397,13 @@ def configure(self, url: Union[str, UsbDevice], del kwargs['cs_count'] if not 1 <= self._cs_count <= 5: raise ValueError(f'Unsupported CS line count: {self._cs_count}') + if 'active_high_cs' in kwargs: + active_high_cs = set([int(x) for x in kwargs['active_high_cs']]) + del kwargs['active_high_cs'] + if max(active_high_cs) >= self._cs_count or min(active_high_cs) < 0: + raise ValueError(f'Invalid active high CS slots {active_high_cs} for {self._cs_count} CS slots') + else: + active_high_cs = [] if 'turbo' in kwargs: self._turbo = bool(kwargs['turbo']) del kwargs['turbo'] @@ -414,6 +429,7 @@ def configure(self, url: Union[str, UsbDevice], raise SpiIOError('Already configured') self._cs_bits = (((SpiController.CS_BIT << self._cs_count) - 1) & ~(SpiController.CS_BIT - 1)) + self._cs_idle = self._cs_bits ^ sum([SpiController.CS_BIT << x for x in active_high_cs]) self._spi_ports = [None] * self._cs_count self._spi_dir = (self._cs_bits | SpiController.DO_BIT | @@ -424,7 +440,7 @@ def configure(self, url: Union[str, UsbDevice], # delay any truncation till the device is actually open self._set_gpio_direction(16, (~self._spi_mask) & 0xFFFF, io_dir) kwargs['direction'] = self._spi_dir | self._gpio_dir - kwargs['initial'] = self._cs_bits | (io_out & self._gpio_mask) + kwargs['initial'] = self._cs_idle | (io_out & self._gpio_mask) if not isinstance(url, str): self._frequency = self._ftdi.open_mpsse_from_device( url, interface=interface, **kwargs) @@ -469,7 +485,7 @@ def get_port(self, cs: int, freq: Optional[float] = None, if cs >= len(self._spi_ports): if cs < 5: # increase cs_count (up to 4) to reserve more /CS channels - raise SpiIOError('/CS pin {cs} not reserved for SPI') + raise SpiIOError(f'/CS pin {cs} not reserved for SPI') raise SpiIOError(f'No such SPI port: {cs}') if not self._spi_ports[cs]: freq = min(freq or self._frequency, self.frequency_max) @@ -609,7 +625,7 @@ def exchange(self, frequency: float, :param cs_epilog: the epilog MPSSE command sequence to execute after the actual exchange. :param cpol: SPI clock polarity, derived from the SPI mode - :param cpol: SPI clock phase, derived from the SPI mode + :param cpha: SPI clock phase, derived from the SPI mode :param duplex: perform a full-duplex exchange (vs. half-duplex), i.e. bits are clocked in and out at once or in a write-then-read manner. @@ -793,7 +809,7 @@ def _exchange_half_duplex(self, frequency: float, ctrl |= self._gpio_low epilog.extend((Ftdi.SET_BITS_LOW, ctrl, direction)) # Restore idle state - cs_high = [Ftdi.SET_BITS_LOW, self._cs_bits | self._gpio_low, + cs_high = [Ftdi.SET_BITS_LOW, self._cs_idle | self._gpio_low, direction] if not self._turbo: cs_high.append(Ftdi.SEND_IMMEDIATE) diff --git a/pyftdi/tests/spi.py b/pyftdi/tests/spi.py index dad6e3fd..bb59a501 100755 --- a/pyftdi/tests/spi.py +++ b/pyftdi/tests/spi.py @@ -361,12 +361,57 @@ def test_cs_default_pulse_rev_clock(self): self._port.force_select() +class SpiActiveHighTestCase(TestCase): + """Basic test for checking active high CS + + It requires a scope or a digital analyzer to validate the signal + waveforms. + """ + @classmethod + def setUpClass(cls): + cls.url = environ.get('FTDI_DEVICE', 'ftdi:///1') + cls.debug = to_bool(environ.get('FTDI_DEBUG', 'off')) + + def setUp(self): + self._spi = SpiController(cs_count=2) + self._spi.configure(self.url, debug=self.debug, active_high_cs=[1]) + self._port0 = self._spi.get_port(0, freq=1e6, mode=0) + self._port1 = self._spi.get_port(1, freq=1e6, mode=0) + + def tearDown(self): + self._spi.terminate() + + def testExchange(self): + for s in [self._port0, self._port1]: + s.exchange(bytes.fromhex('11223344'), 4) + s.exchange(bytes.fromhex('5566'), stop=False) + s.exchange(bytes.fromhex('7788'), start=False) + + def testRead(self): + for s in [self._port0, self._port1]: + s.read(readlen=5) + s.read(readlen=3, stop=False) + s.read(readlen=2, start=False) + + def testWrite(self): + for s in [self._port0, self._port1]: + s.write(b'dead') + s.write(b'be', stop=False) + s.write(b'ef', start=False) + + def testMixed(self): + for s in [self._port0, self._port1]: + s.write(b'01', stop=False) + s.exchange(b'02', readlen=2, start=False, stop=False) + s.read(readlen=2, start=False) + + def suite(): suite_ = TestSuite() loader = TestLoader() mod = modules[__name__] tests = ( # 'Spi', - 'SpiGpio', 'SpiUnaligned', 'SpiCsForce') + 'SpiGpio', 'SpiUnaligned', 'SpiCsForce', 'SpiActiveHigh') for testname in tests: testcase = getattr(mod, f'{testname}TestCase') suite_.addTest(loader.loadTestsFromTestCase(testcase))