Skip to content

Commit

Permalink
Add sdo write callback to local node for tpdo configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
grantweiss-RaymondCorp committed Jul 29, 2024
1 parent b5e546f commit 822009c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
23 changes: 22 additions & 1 deletion canopen/node/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
self.emcy = EmcyProducer(0x80 + self.id)

self.nmt.add_state_change_callback(self._nmt_state_changed)
self.add_write_callback(self._tpdo_configuration_write)

def associate_network(self, network):
self.network = network
Expand Down Expand Up @@ -144,4 +145,24 @@ def _nmt_state_changed(self, old_state, new_state):
else:
logger.info(f"TPDO {i} not enabled")
elif old_state == "OPERATIONAL":
self.tpdo.stop()
self.tpdo.stop()

def _tpdo_configuration_write(self, index, subindex, od, data):
if 0x1800 <= index <= 0x19FF:
# Only allowed to edit pdo configuration in pre-op
if self.nmt.state != "PRE-OPERATIONAL":
logger.warning("Tried to configure tpdo when not in pre-op")
return

if subindex == 0x01:
if len(data) == 4:
tpdoNum = (index - 0x1800) + 1
if tpdoNum in self.tpdo.map.keys():
PDO_NOT_VALID = 1 << 31
RTR_NOT_ALLOWED = 1 << 30

cob_id = int.from_bytes(data, 'little') & 0x7FF

self.tpdo.map[tpdoNum].cob_id = cob_id & 0x1FFFFFFF
self.tpdo.map[tpdoNum].enabled = cob_id & PDO_NOT_VALID == 0
self.tpdo.map[tpdoNum].rtr_allowed = cob_id & RTR_NOT_ALLOWED == 0
22 changes: 22 additions & 0 deletions test/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,28 @@ def test_send_pdo_on_operational(self):

self.assertNotEqual(self.local_node.tpdo[1]._task, None)

def test_config_pdo(self):
# Disable tpdo 1
self.local_node.tpdo[1].enabled = False
self.local_node.tpdo[1].cob_id = 0
self.local_node.tpdo[1].period = 0.5 # manually assign a period

self.local_node.nmt.state = 'INITIALISING'
self.local_node.nmt.state = 'PRE-OPERATIONAL'

# Attempt to re-enable tpdo 1 via sdo writing
PDO_NOT_VALID = 1 << 31
odDefaultVal = self.local_node.object_dictionary["Transmit PDO 0 communication parameters.COB-ID use by TPDO 1"].default
enabledCobId = odDefaultVal & ~PDO_NOT_VALID # Ensure invalid bit is not set

self.remote_node.sdo["Transmit PDO 0 communication parameters.COB-ID use by TPDO 1"].raw = enabledCobId

# Transition to operational
self.local_node.nmt.state = 'OPERATIONAL'

# Ensure tpdo automatically started with transition
self.assertNotEqual(self.local_node.tpdo[1]._task, None)


if __name__ == "__main__":
unittest.main()

0 comments on commit 822009c

Please sign in to comment.