Skip to content

Commit

Permalink
Use kwargs for pymodbus calls (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexrudd2 authored Jan 21, 2025
1 parent 0945b1d commit b194e29
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 18 deletions.
15 changes: 5 additions & 10 deletions clickplc/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,22 @@ def __init__(self, address, tag_filepath='', timeout=1):
self.bigendian = Endian.BIG if self.pymodbus35plus else Endian.Big # type: ignore[attr-defined]
self.lilendian = Endian.LITTLE if self.pymodbus35plus else Endian.Little # type: ignore[attr-defined]

async def _request(self, method, *args, **kwargs):
async def _request(self, method, address, count=0, values=(), **kwargs):
if method == 'read_coils':
address, count = args
return ReadCoilsResponse([self._coils[address + i] for i in range(count)])
if method == 'read_discrete_inputs':
address, count = args
return ReadDiscreteInputsResponse([self._discrete_inputs[address + i]
for i in range(count)])
elif method == 'read_holding_registers':
address, count = args
return ReadHoldingRegistersResponse([int.from_bytes(self._registers[address + i],
byteorder='big')
for i in range(count)])
elif method == 'write_coils':
address, data = args
for i, d in enumerate(data):
for i, d in enumerate(values):
self._coils[address + i] = d
return WriteMultipleCoilsResponse(address, data)
return WriteMultipleCoilsResponse(address, values)
elif method == 'write_registers':
address, data = args
for i, d in enumerate(data):
for i, d in enumerate(values):
self._registers[address + i] = d
return WriteMultipleRegistersResponse(address, data)
return WriteMultipleRegistersResponse(address, values)
return NotImplementedError(f'Unrecognised method: {method}')
16 changes: 8 additions & 8 deletions clickplc/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def _connect(self) -> None:

async def read_coils(self, address: int, count):
"""Read modbus output coils (0 address prefix)."""
return await self._request('read_coils', address, count)
return await self._request('read_coils', address=address, count=count)

async def read_registers(self, address: int, count):
"""Read modbus registers.
Expand All @@ -73,16 +73,16 @@ async def read_registers(self, address: int, count):
"""
registers = []
while count > 124:
r = await self._request('read_holding_registers', address, 124)
r = await self._request('read_holding_registers', address=address, count=124)
registers += r.registers
address, count = address + 124, count - 124
r = await self._request('read_holding_registers', address, count)
r = await self._request('read_holding_registers', address=address, count=count)
registers += r.registers
return registers

async def write_coils(self, address: int, values):
"""Write modbus coils."""
await self._request('write_coils', address, values)
await self._request('write_coils', address=address, values=values)

async def write_registers(self, address: int, values, skip_encode=False):
"""Write modbus registers.
Expand All @@ -92,11 +92,11 @@ async def write_registers(self, address: int, values, skip_encode=False):
chunking larger requests.
"""
while len(values) > 62:
await self._request('write_registers',
address, values, skip_encode=skip_encode)
await self._request('write_registers', address=address, values=values,
skip_encode=skip_encode)
address, values = address + 124, values[62:]
await self._request('write_registers',
address, values, skip_encode=skip_encode)
await self._request('write_registers', address=address, values=values,
skip_encode=skip_encode)

async def _request(self, method, *args, **kwargs):
"""Send a request to the device and awaits a response.
Expand Down

0 comments on commit b194e29

Please sign in to comment.