Skip to content

Add support for active high CS #382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyftdi/eeprom.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def set_property(self, name: str, value: Union[str, int, bool],
mobj = match(r'cbus_func_(\d)', name)
if mobj:
if not isinstance(value, str):
raise ValueError("'{name}' should be specified as a string")
raise ValueError(f"'{name}' should be specified as a string")
self._set_cbus_func(int(mobj.group(1)), value, out)
self._dirty.add(name)
return
Expand Down
2 changes: 1 addition & 1 deletion pyftdi/ftdi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2267,7 +2267,7 @@ def _set_baudrate(self, baudrate: int, constrain: bool) -> int:
raise FtdiError('Unable to set baudrate')
return actual
except USBError as exc:
raise FtdiError('UsbError: {exc}') from exc
raise FtdiError(f'UsbError: {exc}') from exc

def _set_frequency(self, frequency: float) -> float:
"""Convert a frequency value into a TCK divisor setting"""
Expand Down
34 changes: 25 additions & 9 deletions pyftdi/spi.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,8 @@ def set_mode(self, mode: int, cs_hold: Optional[int] = None) -> None:
self._cs_hold = cs_hold
self._cpol = bool(mode & 0x2)
self._cpha = bool(mode & 0x1)
cs_clock = 0xFF & ~((int(not self._cpol) and SpiController.SCK_BIT) |
SpiController.DO_BIT)
cs_select = 0xFF & ~((SpiController.CS_BIT << self._cs) |
(int(not self._cpol) and SpiController.SCK_BIT) |
SpiController.DO_BIT)
cs_clock = self._controller._cs_idle | (int(self._cpol) and SpiController.SCK_BIT)
cs_select = cs_clock ^ (SpiController.CS_BIT << self._cs)
self._cs_prolog = bytes([cs_clock, cs_select])
self._cs_epilog = bytes([cs_select] + [cs_clock] * int(cs_hold))

Expand Down Expand Up @@ -210,6 +207,14 @@ def cs(self) -> int:
:return: the /CS index (starting from 0)
"""
return self._cs

@property
def cs_active_high(self) -> bool:
"""Is the CS active high?

:return: True if it's active high, False if active low
"""
(self._controller._cs_idle & (SpiController.CS_BIT << self._cs)) == 0

@property
def mode(self) -> int:
Expand Down Expand Up @@ -352,6 +357,7 @@ def __init__(self, cs_count: int = 1, turbo: bool = True):
self._spi_ports = []
self._spi_dir = 0
self._spi_mask = self.SPI_BITS
self._cs_idle = 0xff

def configure(self, url: Union[str, UsbDevice],
**kwargs: Mapping[str, Any]) -> None:
Expand All @@ -378,6 +384,8 @@ def configure(self, url: Union[str, UsbDevice],
frequency.
* ``cs_count`` count of chip select signals dedicated to select
SPI slave devices, starting from A*BUS3 pin
* ``active_high_cs`` iterable of CS slots that should be active high
instead of active low
* ``turbo`` whether to enable or disable turbo mode
* ``debug`` to increase log verbosity, using MPSSE tracer
"""
Expand All @@ -389,6 +397,13 @@ def configure(self, url: Union[str, UsbDevice],
del kwargs['cs_count']
if not 1 <= self._cs_count <= 5:
raise ValueError(f'Unsupported CS line count: {self._cs_count}')
if 'active_high_cs' in kwargs:
active_high_cs = set([int(x) for x in kwargs['active_high_cs']])
del kwargs['active_high_cs']
if max(active_high_cs) >= self._cs_count or min(active_high_cs) < 0:
raise ValueError(f'Invalid active high CS slots {active_high_cs} for {self._cs_count} CS slots')
else:
active_high_cs = []
if 'turbo' in kwargs:
self._turbo = bool(kwargs['turbo'])
del kwargs['turbo']
Expand All @@ -414,6 +429,7 @@ def configure(self, url: Union[str, UsbDevice],
raise SpiIOError('Already configured')
self._cs_bits = (((SpiController.CS_BIT << self._cs_count) - 1) &
~(SpiController.CS_BIT - 1))
self._cs_idle = self._cs_bits ^ sum([SpiController.CS_BIT << x for x in active_high_cs])
self._spi_ports = [None] * self._cs_count
self._spi_dir = (self._cs_bits |
SpiController.DO_BIT |
Expand All @@ -424,7 +440,7 @@ def configure(self, url: Union[str, UsbDevice],
# delay any truncation till the device is actually open
self._set_gpio_direction(16, (~self._spi_mask) & 0xFFFF, io_dir)
kwargs['direction'] = self._spi_dir | self._gpio_dir
kwargs['initial'] = self._cs_bits | (io_out & self._gpio_mask)
kwargs['initial'] = self._cs_idle | (io_out & self._gpio_mask)
if not isinstance(url, str):
self._frequency = self._ftdi.open_mpsse_from_device(
url, interface=interface, **kwargs)
Expand Down Expand Up @@ -469,7 +485,7 @@ def get_port(self, cs: int, freq: Optional[float] = None,
if cs >= len(self._spi_ports):
if cs < 5:
# increase cs_count (up to 4) to reserve more /CS channels
raise SpiIOError('/CS pin {cs} not reserved for SPI')
raise SpiIOError(f'/CS pin {cs} not reserved for SPI')
raise SpiIOError(f'No such SPI port: {cs}')
if not self._spi_ports[cs]:
freq = min(freq or self._frequency, self.frequency_max)
Expand Down Expand Up @@ -609,7 +625,7 @@ def exchange(self, frequency: float,
:param cs_epilog: the epilog MPSSE command sequence to execute
after the actual exchange.
:param cpol: SPI clock polarity, derived from the SPI mode
:param cpol: SPI clock phase, derived from the SPI mode
:param cpha: SPI clock phase, derived from the SPI mode
:param duplex: perform a full-duplex exchange (vs. half-duplex),
i.e. bits are clocked in and out at once or
in a write-then-read manner.
Expand Down Expand Up @@ -793,7 +809,7 @@ def _exchange_half_duplex(self, frequency: float,
ctrl |= self._gpio_low
epilog.extend((Ftdi.SET_BITS_LOW, ctrl, direction))
# Restore idle state
cs_high = [Ftdi.SET_BITS_LOW, self._cs_bits | self._gpio_low,
cs_high = [Ftdi.SET_BITS_LOW, self._cs_idle | self._gpio_low,
direction]
if not self._turbo:
cs_high.append(Ftdi.SEND_IMMEDIATE)
Expand Down
2 changes: 1 addition & 1 deletion pyftdi/tests/backend/ftdivirt.py
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ def _build_eeprom(cls, version, eeprom: Optional[dict]) -> bytearray:
try:
size = cls.EXT_EEPROMS[model.lower()]
except KeyError as exc:
raise ValueError('Unsupported EEPROM model: {model}') \
raise ValueError(f'Unsupported EEPROM model: {model}') \
from exc
data = eeprom.get('data', b'')
if version in cls.INT_EEPROMS:
Expand Down
2 changes: 1 addition & 1 deletion pyftdi/tests/backend/mpsse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class VirtMpsseTracer(FtdiMpsseTracer):

def __init__(self, port: 'VirtFtdiPort', version: int):
super().__init__(version)
self.log = getLogger('pyftdi.virt.mpsse.{port.iface}')
self.log = getLogger(f'pyftdi.virt.mpsse.{port.iface}')
self._port = port

def _get_engine(self, iface: int):
Expand Down
2 changes: 1 addition & 1 deletion pyftdi/tests/backend/usbvirt.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def get_virtual_ftdi(self, bus: int, address: int) -> VirtFtdi:
for dev in self._devices:
if dev.bus == bus and dev.address == address:
return dev.ftdi
raise ValueError('No FTDI @ {bus:address}')
raise ValueError(f'No FTDI @ {bus:address}')

def enumerate_devices(self) -> VirtDevice:
yield from self._devices
Expand Down
47 changes: 46 additions & 1 deletion pyftdi/tests/spi.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,57 @@ def test_cs_default_pulse_rev_clock(self):
self._port.force_select()


class SpiActiveHighTestCase(TestCase):
"""Basic test for checking active high CS

It requires a scope or a digital analyzer to validate the signal
waveforms.
"""
@classmethod
def setUpClass(cls):
cls.url = environ.get('FTDI_DEVICE', 'ftdi:///1')
cls.debug = to_bool(environ.get('FTDI_DEBUG', 'off'))

def setUp(self):
self._spi = SpiController(cs_count=2)
self._spi.configure(self.url, debug=self.debug, active_high_cs=[1])
self._port0 = self._spi.get_port(0, freq=1e6, mode=0)
self._port1 = self._spi.get_port(1, freq=1e6, mode=0)

def tearDown(self):
self._spi.terminate()

def testExchange(self):
for s in [self._port0, self._port1]:
s.exchange(bytes.fromhex('11223344'), 4)
s.exchange(bytes.fromhex('5566'), stop=False)
s.exchange(bytes.fromhex('7788'), start=False)

def testRead(self):
for s in [self._port0, self._port1]:
s.read(readlen=5)
s.read(readlen=3, stop=False)
s.read(readlen=2, start=False)

def testWrite(self):
for s in [self._port0, self._port1]:
s.write(b'dead')
s.write(b'be', stop=False)
s.write(b'ef', start=False)

def testMixed(self):
for s in [self._port0, self._port1]:
s.write(b'01', stop=False)
s.exchange(b'02', readlen=2, start=False, stop=False)
s.read(readlen=2, start=False)


def suite():
suite_ = TestSuite()
loader = TestLoader()
mod = modules[__name__]
tests = ( # 'Spi',
'SpiGpio', 'SpiUnaligned', 'SpiCsForce')
'SpiGpio', 'SpiUnaligned', 'SpiCsForce', 'SpiActiveHigh')
for testname in tests:
testcase = getattr(mod, f'{testname}TestCase')
suite_.addTest(loader.loadTestsFromTestCase(testcase))
Expand Down
4 changes: 2 additions & 2 deletions pyftdi/usbtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,15 @@ def parse_url(cls, urlstr: str, scheme: str,
if len(cvendors) == 1:
vendor = cvendors.pop()
if vendor not in pdict:
vstr = '0x{vendor:04x}' if vendor is not None else '?'
vstr = f'0x{vendor:04x}' if vendor is not None else '?'
raise UsbToolsError(f'Vendor ID {vstr} not supported')
if not product:
cproducts = {candidate[1] for candidate in candidates
if candidate[0] == vendor}
if len(cproducts) == 1:
product = cproducts.pop()
if product not in pdict[vendor].values():
pstr = '0x{vendor:04x}' if product is not None else '?'
pstr = f'0x{vendor:04x}' if product is not None else '?'
raise UsbToolsError(f'Product ID {pstr} not supported')
devdesc = UsbDeviceDescriptor(vendor, product, desc.bus, desc.address,
desc.sn, idx, desc.description)
Expand Down