Debugging utility like RapidScada Modbus Parser #1407
Replies: 4 comments 9 replies
-
At the moment it does not. We that the REPL client/server that does it partly, and we have pymodbus.simulator that will have it shortly (but that is only for the server). We are currently refactoring the client, and we will take your idea into account, because we do have the information to do it. |
Beta Was this translation helpful? Give feedback.
-
Feel free to submit an issue or even better a pull request with the implementation |
Beta Was this translation helpful? Give feedback.
-
I had to do some in-depth Modbus TCP debugging this week with Rapid SCADA Modbus Parser, and ended up writing this code: from dataclasses import dataclass
from html.parser import HTMLParser
import requests
RAPID_SCADA_URL = "https://rapidscada.net/modbus/"
@dataclass(frozen=True)
class ParsedModbusResult:
transaction_id: int
length: int
unit_id: int
func_code: int
@dataclass(frozen=True)
class ParsedModbusRequest(ParsedModbusResult):
zero_index_reg: int
quantity: int
@dataclass(frozen=True)
class ParsedModbusResponse(ParsedModbusResult):
byte_count: int
registers: list[int]
def explain_with_rapid_scada(
packet: str,
is_modbus_tcp: bool = True,
is_receive: bool = False,
timeout: float | tuple[float, float] | None = 15.0,
) -> ParsedModbusResult:
"""
Explain a Modbus packet using https://rapidscada.net/modbus/.
Args:
packet: Packet from pymodbus logs.
is_modbus_tcp: Set True (default) for Modbus TCP or False for Modbus RTU.
is_receive: Set True if pymodbus log says RECV, otherwise False for SEND.
timeout: Timeout (sec) for the HTTP post.
Returns:
Parsed data from Rapid SCADA Modbus Parser.
"""
class NonEmptyDataFromHTML(HTMLParser):
"""Aggregate all data from an HTML blob."""
def __init__(self, *, convert_charrefs=True):
super().__init__(convert_charrefs=convert_charrefs)
self._data = []
@property
def data(self) -> list[str]:
return self._data
def handle_data(self, data: str) -> None:
if data.strip() == "":
return
self._data.append(data.strip())
data_packet = "+".join(
[f"{int(hex_str, base=16):02X}" for hex_str in packet.split(" ")],
)
raw_response: requests.Response = requests.post(
f"{RAPID_SCADA_URL}?ModbusMode={int(is_modbus_tcp)}"
f"&DataDirection={int(is_receive)}&DataPackage={data_packet}",
timeout=timeout,
)
raw_response.raise_for_status()
parser = NonEmptyDataFromHTML()
parser.feed(raw_response.text)
def get_next_field(prior_field: str, data: list[str] = parser.data) -> str:
return data[data.index(prior_field) + 1]
def parse_next_field(prior_field: str, split_index: int = 0) -> int:
return int(get_next_field(prior_field).split(" ")[split_index], base=16)
base_result_data = {
"transaction_id": parse_next_field("Transaction identifier"),
"length": parse_next_field("Length"),
"unit_id": parse_next_field("Unit identifier"),
"func_code": parse_next_field("Function code"),
}
if not is_receive:
return ParsedModbusRequest(
**base_result_data,
zero_index_reg=parse_next_field("Starting address", split_index=1),
quantity=parse_next_field("Quantity"),
)
return ParsedModbusResponse(
**base_result_data,
byte_count=parse_next_field("Byte count"),
registers=[
int(raw_value.split(" ")[0], base=16)
for raw_value in get_next_field("Register value").split(", ")
],
)
explained_send = explain_with_rapid_scada(
"0x6e 0x46 0x0 0x0 0x0 0x6 0x1 0x3 0x0 0x6 0x0 0x2"
)
print(explained_send)
explained_recv = explain_with_rapid_scada(
"0x6e 0x46 0x0 0x0 0x0 0x7 0x1 0x3 0x4 0x0 0x0 0x6 0xfb", is_receive=True
)
print(explained_recv) Hope it helps someone! |
Beta Was this translation helpful? Give feedback.
-
Thanks to a couple of PR, this idea is available to everybody. |
Beta Was this translation helpful? Give feedback.
-
I can decode
pymodbus.logging
messages of Modbus TCP:Using Rapid SCADA Modbus Parser:
Using this function:
Does
pymodbus
have a similar "decoding" utility I can use for debugging? It doesn't have to be GUI based, it can be just command line or an importable functionBeta Was this translation helpful? Give feedback.
All reactions