Skip to content

Commit

Permalink
Add robust error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
wrongsyntax committed Feb 8, 2025
1 parent aaa0a75 commit 4ad334a
Showing 1 changed file with 157 additions and 87 deletions.
244 changes: 157 additions & 87 deletions src/modules/autopilot/altimeter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
import smbus2
import time
from enum import Enum
from typing import Optional, List, Dict, Tuple


DEBUG = False

class SensorError(Exception):
"""Base exception class for sensor errors"""
pass


class SensorState(Enum):
"""Enum to track sensor state"""
UNINITIALIZED = 0
INITIALIZED = 1
ERROR = 2
NEEDS_CALIBRATION = 3


class SensorReflectorShape(Enum):
"""Enum to define reflector shape"""
GENERIC = 1
PLANAR = 2


class MovingAverage:
def __init__(self, window_size=5):
Expand All @@ -23,9 +43,13 @@ def get_average(self):
def is_valid(self):
return len(self.values) == self.window_size

def reset(self):
"""Clear all values"""
self.values = []


class XM125:
# Register addresses from documentation
# Register addresses
REG_VERSION = 0x0000
REG_PROTOCOL_STATUS = 0x0001
REG_MEASURE_COUNTER = 0x0002
Expand All @@ -41,9 +65,11 @@ class XM125:
# Command values
CMD_APPLY_CONFIG_AND_CALIBRATE = 1
CMD_MEASURE_DISTANCE = 2
CMD_RECALIBRATE = 5
CMD_RESET_MODULE = 1381192737

# Value enums
REFLECTOR_SHAPE = 1 # 1 = Generic, 2 = Planar
# Config values
REFLECTOR_SHAPE = SensorReflectorShape.PLANAR.value

# Status masks
DETECTOR_STATUS_BUSY_MASK = 0x80000000
Expand All @@ -52,16 +78,24 @@ class XM125:
DISTANCE_RESULT_CALIBRATION_NEEDED = 0x00000200
DISTANCE_RESULT_MEASURE_DISTANCE_ERROR = 0x00000400

# Error recovery constants
MAX_RETRIES = 3
RETRY_DELAY = 0.5 # seconds
ERROR_TIMEOUT = 5.0 # seconds

def __init__(self, bus=1, address=0x52, average_window=5):
self.bus = smbus2.SMBus(bus)
self.address = address
self.distance_avg = MovingAverage()
self.strength_avg = MovingAverage()

def _read_register(self, reg_addr):
"""Read a 32-bit register value."""
self.distance_avg = MovingAverage(average_window)
self.strength_avg = MovingAverage(average_window)
self.state = SensorState.UNINITIALIZED
self.last_error_time = 0
self.error_count = 0
self.consecutive_errors = 0

def _read_register(self, reg_addr) -> Optional[int]:
"""Read a 32-bit register value with error handling."""
try:
# Write register address (big endian)
write = smbus2.i2c_msg.write(self.address, [(reg_addr >> 8) & 0xFF, reg_addr & 0xFF])
read = smbus2.i2c_msg.read(self.address, 4)

Expand All @@ -70,13 +104,8 @@ def _read_register(self, reg_addr):
print(f" Register: 0x{reg_addr:04x}")
print(f" Sending address bytes: MSB=0x{(reg_addr >> 8) & 0xFF:02x}, LSB=0x{reg_addr & 0xFF:02x}")

# Execute the transaction with repeated start
self.bus.i2c_rdwr(write, read)

# Convert the read result to bytes
data = list(read)

# Convert to 32-bit integer (big endian as per protocol)
value = (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3]

if DEBUG:
Expand All @@ -85,13 +114,12 @@ def _read_register(self, reg_addr):

return value
except IOError as e:
print(f"I/O error reading register 0x{reg_addr:04x}: {e}")
self.handle_error(f"I/O error reading register 0x{reg_addr:04x}: {e}")
return None

def _write_register(self, reg_addr, value):
"""Write a 32-bit register value."""
def _write_register(self, reg_addr: int, value: int) -> bool:
"""Write a 32-bit register value with error handling."""
try:
# Prepare all 6 bytes as per protocol example
data = [
(reg_addr >> 8) & 0xFF, # Address to slave [15:8]
reg_addr & 0xFF, # Address to slave [7:0]
Expand All @@ -105,16 +133,35 @@ def _write_register(self, reg_addr, value):
print(f"\nWRITE OPERATION:")
print(f" Register: 0x{reg_addr:04x}")
print(f" Value to write: 0x{value:08x} ({value})")
print(
f" Sending bytes: [0x{data[0]:02x}, 0x{data[1]:02x}, 0x{data[2]:02x}, 0x{data[3]:02x}, 0x{data[4]:02x}, 0x{data[5]:02x}]")
print(f" Sending bytes: {[f'0x{b:02x}' for b in data]}")

# Write all bytes in a single transaction
self.bus.write_i2c_block_data(self.address, data[0], data[1:])
return True
except IOError as e:
print(f"I/O error writing register 0x{reg_addr:04x}: {e}")
self.handle_error(f"I/O error writing register 0x{reg_addr:04x}: {e}")
return False

def handle_error(self, error_msg: str):
"""Handle errors and implement recovery logic."""
current_time = time.time()
self.consecutive_errors += 1
self.error_count += 1

print(f"Error: {error_msg}")
print(f"Consecutive errors: {self.consecutive_errors}")
print(f"Total errors: {self.error_count}")

if self.consecutive_errors >= self.MAX_RETRIES:
if current_time - self.last_error_time > self.ERROR_TIMEOUT:
print("Attempting sensor reset and recalibration...")
self.reset_and_recalibrate()
self.consecutive_errors = 0
else:
raise SensorError(f"Too many consecutive errors: {error_msg}")

self.last_error_time = current_time
time.sleep(self.RETRY_DELAY)

def wait_not_busy(self):
"""Wait until the detector is not busy."""
if DEBUG:
Expand Down Expand Up @@ -184,110 +231,133 @@ def begin(self, start_mm=250, end_mm=3000):

return status is not None and status >= 0

def measure(self):
"""Perform a distance measurement and return the results."""
if DEBUG:
print("\nStarting measurement...")
def reset_and_recalibrate(self):
"""Reset the sensor and recalibrate."""
try:
# Reset the module
self._write_register(self.REG_COMMAND, self.CMD_RESET_MODULE)
time.sleep(1.0) # Wait for reset

# Start measurement
if not self._write_register(self.REG_COMMAND, self.CMD_MEASURE_DISTANCE):
if DEBUG:
print("Failed to start measurement")
return []
# Clear averaging buffers
self.distance_avg.reset()
self.strength_avg.reset()

self.wait_not_busy()
# Reinitialize
if not self.begin():
raise SensorError("Failed to reinitialize sensor after reset")

print("Sensor reset and recalibration successful")
self.state = SensorState.INITIALIZED

# Read result
except Exception as e:
self.state = SensorState.ERROR
raise SensorError(f"Reset and recalibration failed: {str(e)}")

def check_calibration(self) -> bool:
"""Check if sensor needs calibration and perform if needed."""
result = self._read_register(self.REG_DISTANCE_RESULT)
if result is None:
if DEBUG:
print("Failed to read measurement result")
return []
return False

if DEBUG:
print(f"Measurement result: 0x{result:08x}")
if result & self.DISTANCE_RESULT_CALIBRATION_NEEDED:
print("Calibration needed, recalibrating...")
self._write_register(self.REG_COMMAND, self.CMD_RECALIBRATE)
time.sleep(0.5)
self.state = SensorState.INITIALIZED
return True

if result & self.DISTANCE_RESULT_MEASURE_DISTANCE_ERROR:
print("Error: Measurement error")
return []
return True

if result & self.DISTANCE_RESULT_CALIBRATION_NEEDED:
print("Error: Calibration needed")
return []
def measure(self) -> List[Dict[str, Tuple[float, float]]]:
"""Perform a distance measurement with error checking and recovery."""
try:
if self.state == SensorState.ERROR:
self.reset_and_recalibrate()

num_distances = (result & self.DISTANCE_RESULT_NUM_DISTANCES_MASK)
if not self.check_calibration():
return []

if DEBUG:
print(f"Number of distances detected: {num_distances}")
# Start measurement
if not self._write_register(self.REG_COMMAND, self.CMD_MEASURE_DISTANCE):
return []

peaks_with_average = []
self.wait_not_busy()

for i in range(num_distances):
if DEBUG:
print(f"\nReading peak {i}:")
# Read result
result = self._read_register(self.REG_DISTANCE_RESULT)
if result is None:
return []

distance = self._read_register(self.REG_PEAK0_DISTANCE + i)
strength = self._read_register(self.REG_PEAK0_STRENGTH + i)
if result & self.DISTANCE_RESULT_MEASURE_DISTANCE_ERROR:
self.handle_error("Measurement error")
return []

if distance is not None and strength is not None:
# Add to moving averages
self.distance_avg.add(distance)
self.strength_avg.add(strength)
num_distances = (result & self.DISTANCE_RESULT_NUM_DISTANCES_MASK)
peaks_with_average = []

# Create tuple with both raw and averaged values
avg_distance = self.distance_avg.get_average()
avg_strength = self.strength_avg.get_average()
for i in range(num_distances):
distance = self._read_register(self.REG_PEAK0_DISTANCE + i)
strength = self._read_register(self.REG_PEAK0_STRENGTH + i)

peak_data = {
'raw': (distance, strength),
'averaged': (avg_distance, avg_strength) if self.distance_avg.is_valid() else None
}
if distance is not None and strength is not None:
self.distance_avg.add(distance)
self.strength_avg.add(strength)

peaks_with_average.append(peak_data)
peak_data = {
'raw': (distance, strength),
'averaged': (self.distance_avg.get_average(),
self.strength_avg.get_average()) if self.distance_avg.is_valid() else None
}
peaks_with_average.append(peak_data)

if DEBUG:
print(f" Raw Distance: {distance}mm, Strength: {strength}")
if self.distance_avg.is_valid():
print(f" Avg Distance: {avg_distance:.1f}mm, Strength: {avg_strength:.1f}")
# Reset consecutive errors on successful measurement
self.consecutive_errors = 0
return peaks_with_average

return peaks_with_average
except Exception as e:
self.handle_error(f"Measurement error: {str(e)}")
return []


def main():
# Initialize sensor
sensor = XM125(average_window=5) # 5-sample moving average
sensor = XM125(average_window=5)

try:
# Setup sensor with 1m to 5m range
if not sensor.begin(start_mm=1000, end_mm=5000):
print("Failed to initialize sensor")
return

print("Sensor initialized successfully")

# Continuous measurement loop
while True:
peaks = sensor.measure()
try:
peaks = sensor.measure()

if peaks:
for i, peak_data in enumerate(peaks):
raw_distance, raw_strength = peak_data['raw']
print(f"\nPeak {i}:")
print(f" Raw: Distance = {raw_distance}mm, Strength = {raw_strength}")
if peaks:
for i, peak_data in enumerate(peaks):
raw_distance, raw_strength = peak_data['raw']
print(f"\nPeak {i}:")
print(f" Raw: Distance = {raw_distance}mm, Strength = {raw_strength}")

if peak_data['averaged']:
avg_distance, avg_strength = peak_data['averaged']
print(f" Avg: Distance = {avg_distance:.1f}mm, Strength = {avg_strength:.1f}")
else:
print("No peaks detected")
if peak_data['averaged']:
avg_distance, avg_strength = peak_data['averaged']
print(f" Avg: Distance = {avg_distance:.1f}mm, Strength = {avg_strength:.1f}")
else:
print("No peaks detected")

time.sleep(0.5)
time.sleep(0.5)

except SensorError as e:
print(f"Sensor error: {e}")
print("Attempting recovery...")
time.sleep(1.0)
continue

except KeyboardInterrupt:
print("\nStopping measurements")
sensor.bus.close()
except Exception as e:
print(f"Error: {e}")
print(f"Fatal error: {e}")


if __name__ == "__main__":
Expand Down

0 comments on commit 4ad334a

Please sign in to comment.