Skip to content

INFRA-388 Converting smartmon into python and adding mock tests #1327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: stackhpc/2024.1
Choose a base branch
from
226 changes: 226 additions & 0 deletions etc/kayobe/ansible/scripts/smartmon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
#!/usr/bin/env python3

import subprocess
import json
import re
import datetime

from pySMART import DeviceList

SMARTCTL_PATH = "/usr/sbin/smartctl"

SMARTMON_ATTRS = {
"airflow_temperature_cel",
"command_timeout",
"current_pending_sector",
"end_to_end_error",
"erase_fail_count",
"g_sense_error_rate",
"hardware_ecc_recovered",
"host_reads_32mib",
"host_reads_mib",
"host_writes_32mib",
"host_writes_mib",
"load_cycle_count",
"media_wearout_indicator",
"nand_writes_1gib",
"offline_uncorrectable",
"power_cycle_count",
"power_on_hours",
"program_fail_cnt_total",
"program_fail_count",
"raw_read_error_rate",
"reallocated_event_count",
"reallocated_sector_ct",
"reported_uncorrect",
"runtime_bad_block",
"sata_downshift_count",
"seek_error_rate",
"spin_retry_count",
"spin_up_time",
"start_stop_count",
"temperature_case",
"temperature_celsius",
"temperature_internal",
"total_lbas_read",
"total_lbas_written",
"udma_crc_error_count",
"unsafe_shutdown_count",
"unused_rsvd_blk_cnt_tot",
"wear_leveling_count",
"workld_host_reads_perc",
"workld_media_wear_indic",
"workload_minutes",
"critical_warning",
"temperature",
"available_spare",
"available_spare_threshold",
"percentage_used",
"data_units_read",
"data_units_written",
"host_reads",
"host_writes",
"controller_busy_time",
"power_cycles",
"unsafe_shutdowns",
"media_errors",
"num_err_log_entries",
"warning_temp_time",
"critical_comp_time",
}

def run_command(command, parse_json=False):
"""
Helper to run a subprocess command and optionally parse JSON output.
"""
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if parse_json:
return json.loads(result.stdout)
return result.stdout.strip()

def camel_to_snake(name):
"""
Convert a CamelCase string to snake_case.

Reference: https://stackoverflow.com/questions/1175208/elegant-python-function-to-convert-camelcase-to-snake-case
"""
return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()

def parse_device_info(device):
"""
Produce Prometheus lines describing the device's identity and SMART status:
- device_info
- device_smart_available
- device_smart_enabled
- device_smart_healthy

Args:
device (Device): A pySMART Device object with attributes such as name, interface, etc.

Returns:
List[str]: A list of Prometheus formatted metric strings.
"""
serial_number = (device.serial or "").lower()
labels = {
"disk": device.name,
"type": device.interface or "",
"vendor": device.vendor or "",
"model_family": device.family or "",
"device_model": device.model or "",
"serial_number": serial_number,
"firmware_version": device.firmware or "",
}
label_str = ",".join(f'{k}="{v}"' for k, v in labels.items())

metrics = [
f'device_info{{{label_str}}} 1',
f'device_smart_available{{disk="{device.name}",type="{device.interface}",serial_number="{serial_number}"}} {1 if device.smart_capable else 0}',
]

if device.smart_capable:
metrics.append(
f'device_smart_enabled{{disk="{device.name}",type="{device.interface}",serial_number="{serial_number}"}} {1 if device.smart_enabled else 0}'
)
if device.assessment:
is_healthy = 1 if device.assessment.upper() == "PASS" else 0
metrics.append(
f'device_smart_healthy{{disk="{device.name}",type="{device.interface}",serial_number="{serial_number}"}} {is_healthy}'
)

return metrics

def parse_if_attributes(device):
"""
For any device type (ATA, NVMe, SCSI, etc.), we read device.if_attributes.
We'll iterate over its public fields, convert them to snake_case,
and if it's in SMARTMON_ATTRS and numeric, we produce metrics.
"""
metrics = []

if not device.if_attributes:
return metrics

disk = device.name
disk_type = device.interface or ""
serial_number = (device.serial or "").lower()
labels = f'disk="{disk}",type="{disk_type}",serial_number="{serial_number}"'

# Inspect all public attributes on device.if_attributes
for attr_name in dir(device.if_attributes):
if attr_name.startswith("_"):
continue # skip private / special methods
val = getattr(device.if_attributes, attr_name, None)
if callable(val):
continue # skip methods

snake_name = camel_to_snake(attr_name)

if snake_name in SMARTMON_ATTRS and isinstance(val, (int, float)):
metrics.append(f"{snake_name}{{{labels}}} {val}")

return metrics

def format_output(metrics):
"""
Convert a list of lines like "some_metric{...} value"
into a Prometheus text output with # HELP / # TYPE lines.
"""
output = []
last_metric = ""
for metric in sorted(metrics):
metric_name = metric.split("{")[0]
if metric_name != last_metric:
output.append(f"# HELP smartmon_{metric_name} SMART metric {metric_name}")
Copy link
Member

@dougszumski dougszumski Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can you switch to the Prometheus library here? It is a slight change that it will write directly to a file for scraping. The advantage is that it handles all the metric formatting. A rough example:

from prometheus_client import CollectorRegistry, Gauge, write_to_textfile

metric_registry = CollectorRegistry()
metric_output_path = os.environ['SMART_METRIC_OUTPUT_PATH'])

for metric in metrics:
   g = Gauge(metric_name, 'some help stirng', registry=registry)
   g.set(metric_value)

write_to_textfile(metric_output_path, registry)

https://prometheus.github.io/client_python/exporting/textfile/

output.append(f"# TYPE smartmon_{metric_name} gauge")
last_metric = metric_name
output.append(f"smartmon_{metric}")
return "\n".join(output)

def main():
all_metrics = []

try:
version_output = run_command([SMARTCTL_PATH, "--version"])
if version_output.startswith("smartctl"):
first_line = version_output.splitlines()[0]
version_num = first_line.split()[1]
else:
version_num = "unknown"
except Exception:
version_num = "unknown"
all_metrics.append(f'smartctl_version{{version="{version_num}"}} 1')

dev_list = DeviceList()

for dev in dev_list.devices:
disk_name = dev.name
disk_type = dev.interface or ""
serial_number = (dev.serial or "").lower()

run_timestamp = int(datetime.datetime.now(datetime.UTC).timestamp())
all_metrics.append(f'smartctl_run{{disk="{disk_name}",type="{disk_type}"}} {run_timestamp}')

active = 1
try:
cmd = [SMARTCTL_PATH, "-n", "standby", "-d", disk_type, "-j", disk_name]
standby_json = run_command(cmd, parse_json=True)
if standby_json.get("power_mode", "") == "standby":
active = 0
except json.JSONDecodeError:
active = 0
except Exception:
active = 0

all_metrics.append(
f'device_active{{disk="{disk_name}",type="{disk_type}",serial_number="{serial_number}"}} {active}'
)
if active == 0:
continue

all_metrics.extend(parse_device_info(dev))
all_metrics.extend(parse_if_attributes(dev))

print(format_output(all_metrics))

if __name__ == "__main__":
main()
Loading
Loading