From a1dd0977a1af8287c0f640306b1c9d4f2a7177cc Mon Sep 17 00:00:00 2001 From: acpiccolo <60399193+acpiccolo@users.noreply.github.com> Date: Mon, 11 Mar 2024 23:13:53 +0100 Subject: [PATCH] chore: add tokio support --- Cargo.toml | 7 +- src/lib.rs | 3 + src/main.rs | 19 +--- src/protocol.rs | 15 +++ src/serialport.rs | 40 ++++++-- src/tokio_serial.rs | 220 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 281 insertions(+), 23 deletions(-) create mode 100644 src/tokio_serial.rs diff --git a/Cargo.toml b/Cargo.toml index dc8d165..2b515d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,8 +13,10 @@ path = "src/main.rs" required-features = ["bin-dependencies"] [features] +serde = ["dep:serde"] default = ["bin-dependencies"] serialport = ["dep:serialport", "dep:anyhow"] +tokio-async = ["dep:tokio-serial", "tokio/time", "tokio/io-util", "dep:anyhow"] bin-dependencies = [ "dep:anyhow", "serialport", @@ -26,10 +28,13 @@ bin-dependencies = [ ] [dependencies] +serde = { version = "1", features = ["derive"], optional = true } +serialport = { version = "4", optional = true } +tokio-serial = { version = "5", optional = true } +tokio = { version = "1", default-features = false, optional = true } # Requirements for bin log = { version = "0.4" } anyhow = { version = "1", optional = true } -serialport = { version = "4", optional = true } clap = { version = "4", optional = true } clap-verbosity-flag = { version = "2", optional = true } clap-num = { version = "1", optional = true } diff --git a/src/lib.rs b/src/lib.rs index 460b620..69d8b02 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,3 +5,6 @@ pub use error::Error; #[cfg(feature = "serialport")] pub mod serialport; + +#[cfg(feature = "tokio-async")] +pub mod tokio_serial; diff --git a/src/main.rs b/src/main.rs index a83485c..126f2ef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -114,11 +114,9 @@ fn main() -> Result<()> { let _log_handle = logging_init(args.verbose.log_level_filter()); - // https://minimalmodbus.readthedocs.io/en/stable/serialcommunication.html#timing-of-the-serial-communications - // minimum delay 4ms by baud rate 9600 - let delay = Duration::max(args.delay, Duration::from_millis(4)); - - let mut bms = dalybms_lib::serialport::DalyBMS::new(&args.device, args.timeout)?; + let mut bms = dalybms_lib::serialport::DalyBMS::new(&args.device)?; + bms.set_timeout(args.timeout)?; + bms.set_delay(args.delay); match args.command { CliCommands::Status => { @@ -132,17 +130,14 @@ fn main() -> Result<()> { } CliCommands::CellVoltages => { let _ = bms.get_status()?; - std::thread::sleep(delay); println!("CellVoltages: {:?}", bms.get_cell_voltages()?); } CliCommands::Temperatures => { let _ = bms.get_status()?; - std::thread::sleep(delay); println!("Temperatures: {:?}", bms.get_cell_temperatures()?); } CliCommands::Balancing => { let _ = bms.get_status()?; - std::thread::sleep(delay); println!("Balancing: {:?}", bms.get_balancing_status()?); } CliCommands::Errors => { @@ -150,21 +145,13 @@ fn main() -> Result<()> { } CliCommands::All => { println!("Status: {:?}", bms.get_status()?); - std::thread::sleep(delay); println!("SOC: {:?}", bms.get_soc()?); - std::thread::sleep(delay); println!("CellVoltageRange: {:?}", bms.get_cell_voltage_range()?); - std::thread::sleep(delay); println!("TemperatureRange: {:?}", bms.get_temperature_range()?); - std::thread::sleep(delay); println!("Mosfet: {:?}", bms.get_mosfet_status()?); - std::thread::sleep(delay); println!("CellVoltages: {:?}", bms.get_cell_voltages()?); - std::thread::sleep(delay); println!("CellTemperatures: {:?}", bms.get_cell_temperatures()?); - std::thread::sleep(delay); println!("Balancing: {:?}", bms.get_balancing_status()?); - std::thread::sleep(delay); println!("Errors: {:?}", bms.get_errors()?); } CliCommands::SetSoc { soc_percent } => bms.set_soc(soc_percent)?, diff --git a/src/protocol.rs b/src/protocol.rs index b341231..8c964f6 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,12 +1,19 @@ use crate::Error; use std::fmt; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + #[derive(Debug)] #[repr(u8)] pub enum Address { Host = 0x40, } +// https://minimalmodbus.readthedocs.io/en/stable/serialcommunication.html#timing-of-the-serial-communications +// minimum delay 4ms by baud rate 9600 +pub const MINIMUM_DELAY: std::time::Duration = std::time::Duration::from_millis(4); + const TX_BUFFER_LENGTH: usize = 13; const RX_BUFFER_LENGTH: usize = 13; const START_BYTE: u8 = 0xa5; @@ -68,6 +75,7 @@ fn validate_checksum(buffer: &[u8]) -> std::result::Result<(), Error> { } #[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct Soc { pub total_voltage: f32, pub current: f32, // negative=charging, positive=discharging @@ -99,6 +107,7 @@ impl Soc { } #[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct CellVoltageRange { pub highest_voltage: f32, pub highest_cell: u8, @@ -130,6 +139,7 @@ impl CellVoltageRange { } #[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct TemperatureRange { pub highest_temperature: i8, pub highest_sensor: u8, @@ -162,6 +172,7 @@ impl TemperatureRange { } #[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum MosfetMode { Stationary, Charging, @@ -169,6 +180,7 @@ pub enum MosfetMode { } #[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct MosfetStatus { pub mode: MosfetMode, pub charging_mosfet: bool, @@ -214,6 +226,7 @@ impl MosfetStatus { } #[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct IOState { pub di1: bool, pub di2: bool, @@ -226,6 +239,7 @@ pub struct IOState { } #[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct Status { pub cells: u8, pub temperature_sensors: u8, @@ -399,6 +413,7 @@ impl CellBalanceState { } #[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum ErrorCode { CellVoltHighLevel1, CellVoltHighLevel2, diff --git a/src/serialport.rs b/src/serialport.rs index d472ed9..c264e6f 100644 --- a/src/serialport.rs +++ b/src/serialport.rs @@ -1,27 +1,38 @@ use crate::protocol::*; use anyhow::{bail, Context, Result}; -use std::time::Duration; +use std::time::{Duration, Instant}; +#[derive(Debug)] pub struct DalyBMS { serial: Box, + last_execution: Instant, + delay: Duration, status: Option, } impl DalyBMS { - pub fn new(port: &str, timeout: Duration) -> Result { + pub fn new(port: &str) -> Result { Ok(Self { serial: serialport::new(port, 9600) .data_bits(serialport::DataBits::Eight) .parity(serialport::Parity::None) .stop_bits(serialport::StopBits::One) .flow_control(serialport::FlowControl::None) - .timeout(timeout) .open() .with_context(|| format!("Cannot open serial port '{}'", port))?, + last_execution: Instant::now(), + delay: MINIMUM_DELAY, status: None, }) } + fn serial_await_delay(&self) { + let last_exec_diff = Instant::now().duration_since(self.last_execution); + if let Some(time_until_delay_reached) = self.delay.checked_sub(last_exec_diff) { + std::thread::sleep(time_until_delay_reached); + } + } + fn send_bytes(&mut self, tx_buffer: &[u8]) -> Result<()> { // clear all incoming serial to avoid data collision loop { @@ -41,13 +52,18 @@ impl DalyBMS { break; } } + self.serial_await_delay(); self.serial .write_all(tx_buffer) .with_context(|| "Cannot write to serial")?; - self.serial - .flush() - .with_context(|| "Cannot flush serial connection") + + if false { + self.serial + .flush() + .with_context(|| "Cannot flush serial connection")?; + } + Ok(()) } fn receive_bytes(&mut self, size: usize) -> Result> { @@ -59,10 +75,22 @@ impl DalyBMS { .read_exact(&mut rx_buffer) .with_context(|| "Cannot receive response")?; + self.last_execution = Instant::now(); + log::trace!("receive_bytes: {:02X?}", rx_buffer); Ok(rx_buffer) } + pub fn set_timeout(&mut self, timeout: Duration) -> Result<()> { + self.serial + .set_timeout(timeout) + .map_err(|err| anyhow::Error::from(err)) + } + + pub fn set_delay(&mut self, delay: Duration) { + self.delay = Duration::max(delay, MINIMUM_DELAY); + } + pub fn get_soc(&mut self) -> Result { self.send_bytes(&Soc::request(Address::Host))?; Soc::decode(&self.receive_bytes(Soc::reply_size())?).with_context(|| "Cannot get SOC") diff --git a/src/tokio_serial.rs b/src/tokio_serial.rs new file mode 100644 index 0000000..199bbc0 --- /dev/null +++ b/src/tokio_serial.rs @@ -0,0 +1,220 @@ +use crate::protocol::*; +use anyhow::{bail, Context, Result}; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio_serial::{SerialPort, SerialPortBuilderExt}; + +#[derive(Debug)] +pub struct DalyBMS { + serial: tokio_serial::SerialStream, + last_execution: Instant, + io_timeout: Duration, + delay: Duration, + status: Option, +} + +impl DalyBMS { + pub fn new(port: &str) -> Result { + Ok(Self { + serial: tokio_serial::new(port, 9600) + .data_bits(tokio_serial::DataBits::Eight) + .parity(tokio_serial::Parity::None) + .stop_bits(tokio_serial::StopBits::One) + .flow_control(tokio_serial::FlowControl::None) + .open_native_async() + .with_context(|| format!("Cannot open serial port '{}'", port))?, + last_execution: Instant::now(), + delay: MINIMUM_DELAY, + io_timeout: Duration::from_secs(5), + status: None, + }) + } + + async fn serial_await_delay(&self) { + let last_exec_diff = Instant::now().duration_since(self.last_execution); + if let Some(time_until_delay_reached) = self.delay.checked_sub(last_exec_diff) { + tokio::time::sleep(time_until_delay_reached).await; + } + } + + async fn send_bytes(&mut self, tx_buffer: &[u8]) -> Result<()> { + // clear all incoming serial to avoid data collision + loop { + let pending = self + .serial + .bytes_to_read() + .with_context(|| "Cannot read number of pending bytes")?; + if pending > 0 { + log::trace!("Got {} pending bytes", pending); + let mut buf: Vec = vec![0; 64]; + + let received = + tokio::time::timeout(self.io_timeout, self.serial.read(buf.as_mut_slice())) + .await + .with_context(|| "Cannot read pending bytes")??; + log::trace!("Read {} pending bytes", received); + } else { + break; + } + } + self.serial_await_delay().await; + + tokio::time::timeout(self.io_timeout, self.serial.write_all(tx_buffer)) + .await + .with_context(|| "Cannot write to serial")??; + + if false { + tokio::time::timeout(self.io_timeout, self.serial.flush()) + .await + .with_context(|| "Cannot flush serial connection")??; + } + Ok(()) + } + + async fn receive_bytes(&mut self, size: usize) -> Result> { + // Clear out the input buffer + let mut rx_buffer = vec![0; size]; + + // Read bytes from the specified serial interface + tokio::time::timeout(self.io_timeout, self.serial.read_exact(&mut rx_buffer)) + .await + .with_context(|| "Cannot receive response")??; + + self.last_execution = Instant::now(); + + log::trace!("receive_bytes: {:02X?}", rx_buffer); + Ok(rx_buffer) + } + + pub fn set_timeout(&mut self, timeout: Duration) -> Result<()> { + self.io_timeout = timeout; + Ok(()) + // self.serial + // .set_timeout(timeout) + // .map_err(|err| anyhow::Error::from(err)) + } + + pub fn set_delay(&mut self, delay: Duration) { + self.delay = Duration::max(delay, MINIMUM_DELAY); + } + + pub async fn get_soc(&mut self) -> Result { + self.send_bytes(&Soc::request(Address::Host)).await?; + Soc::decode(&self.receive_bytes(Soc::reply_size()).await?).with_context(|| "Cannot get SOC") + } + + pub async fn get_cell_voltage_range(&mut self) -> Result { + self.send_bytes(&CellVoltageRange::request(Address::Host)) + .await?; + CellVoltageRange::decode(&self.receive_bytes(CellVoltageRange::reply_size()).await?) + .with_context(|| "Cannot get cell voltage range") + } + + pub async fn get_temperature_range(&mut self) -> Result { + self.send_bytes(&TemperatureRange::request(Address::Host)) + .await?; + TemperatureRange::decode(&self.receive_bytes(TemperatureRange::reply_size()).await?) + .with_context(|| "Cannot get temperature range") + } + + pub async fn get_mosfet_status(&mut self) -> Result { + self.send_bytes(&MosfetStatus::request(Address::Host)) + .await?; + MosfetStatus::decode(&self.receive_bytes(MosfetStatus::reply_size()).await?) + .with_context(|| "Cannot get mosfet status") + } + + pub async fn get_status(&mut self) -> Result { + self.send_bytes(&Status::request(Address::Host)).await?; + let status = Status::decode(&self.receive_bytes(Status::reply_size()).await?) + .with_context(|| "Cannot get status")?; + self.status = Some(status.clone()); + Ok(status) + } + + pub async fn get_cell_voltages(&mut self) -> Result> { + let n_cells = if let Some(status) = &self.status { + status.cells + } else { + bail!("get_status() has to be called at least once before calling get_cell_voltages()"); + }; + self.send_bytes(&CellVoltages::request(Address::Host)) + .await?; + CellVoltages::decode( + &self + .receive_bytes(CellVoltages::reply_size(n_cells)) + .await?, + n_cells, + ) + .with_context(|| "Cannot get cell voltages") + } + + pub async fn get_cell_temperatures(&mut self) -> Result> { + let n_sensors = if let Some(status) = &self.status { + status.temperature_sensors + } else { + bail!("get_status() has to be called at least once before calling get_cell_temperatures()"); + }; + + self.send_bytes(&CellTemperatures::request(Address::Host)) + .await?; + CellTemperatures::decode( + &self + .receive_bytes(CellTemperatures::reply_size(n_sensors)) + .await?, + n_sensors, + ) + .with_context(|| "Cannot get cell temperatures") + } + + pub async fn get_balancing_status(&mut self) -> Result> { + let n_cells = if let Some(status) = &self.status { + status.cells + } else { + bail!( + "get_status() has to be called at least once before calling get_balancing_status()" + ); + }; + + self.send_bytes(&CellBalanceState::request(Address::Host)) + .await?; + CellBalanceState::decode( + &self.receive_bytes(CellBalanceState::reply_size()).await?, + n_cells, + ) + .with_context(|| "Cannot get cell balancing status") + } + + pub async fn get_errors(&mut self) -> Result> { + self.send_bytes(&ErrorCode::request(Address::Host)).await?; + ErrorCode::decode(&self.receive_bytes(ErrorCode::reply_size()).await?) + .with_context(|| "Cannot get errors") + } + + pub async fn set_discharge_mosfet(&mut self, enable: bool) -> Result<()> { + self.send_bytes(&SetDischargeMosfet::request(Address::Host, enable)) + .await?; + SetDischargeMosfet::decode(&self.receive_bytes(SetDischargeMosfet::reply_size()).await?) + .with_context(|| "Cannot set discharge mosfet") + } + + pub async fn set_charge_mosfet(&mut self, enable: bool) -> Result<()> { + self.send_bytes(&SetChargeMosfet::request(Address::Host, enable)) + .await?; + SetChargeMosfet::decode(&self.receive_bytes(SetChargeMosfet::reply_size()).await?) + .with_context(|| "Cannot set charge mosfet") + } + + pub async fn set_soc(&mut self, soc_percent: f32) -> Result<()> { + self.send_bytes(&SetSoc::request(Address::Host, soc_percent)) + .await?; + SetSoc::decode(&self.receive_bytes(SetSoc::reply_size()).await?) + .with_context(|| "Cannot set SOC") + } + + pub async fn reset(&mut self) -> Result<()> { + self.send_bytes(&BmsReset::request(Address::Host)).await?; + BmsReset::decode(&self.receive_bytes(BmsReset::reply_size()).await?) + .with_context(|| "Cannot reset BMS") + } +}