This repository has been archived by the owner on Sep 11, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
First stable version. Works with ESP32 v0.0.1
- Loading branch information
Showing
10 changed files
with
449 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
"""The Format BLE Tracker integration.""" | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
from curses import has_key | ||
import json | ||
import logging | ||
from typing import Any | ||
|
||
import voluptuous as vol | ||
|
||
from homeassistant.components import mqtt | ||
from homeassistant.config_entries import ConfigEntry | ||
from homeassistant.const import Platform | ||
from homeassistant.core import HomeAssistant, callback | ||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator | ||
|
||
from .const import ( | ||
ALIVE_NODES_TOPIC, | ||
DOMAIN, | ||
MAC, | ||
NAME, | ||
ROOM, | ||
ROOT_TOPIC, | ||
RSSI, | ||
) | ||
|
||
PLATFORMS: list[Platform] = [ | ||
Platform.DEVICE_TRACKER, | ||
Platform.SENSOR, | ||
Platform.NUMBER | ||
] | ||
_LOGGER = logging.getLogger(__name__) | ||
|
||
MQTT_PAYLOAD = vol.Schema( | ||
vol.All( | ||
json.loads, | ||
vol.Schema( | ||
{ | ||
vol.Required(RSSI): vol.Coerce(int), | ||
}, | ||
extra=vol.ALLOW_EXTRA, | ||
), | ||
) | ||
) | ||
|
||
|
||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: | ||
"""Set up Format BLE Tracker from a config entry.""" | ||
|
||
hass.data.setdefault(DOMAIN, {}) | ||
|
||
coordinator = BeaconCoordinator(hass, entry.data) | ||
|
||
mac = entry.data[MAC] | ||
state_topic = ROOT_TOPIC + "/" + mac + "/+" | ||
_LOGGER.info("Subscribing to %s", state_topic) | ||
await mqtt.async_subscribe(hass, state_topic, coordinator.message_received, 1) | ||
alive_topic = ALIVE_NODES_TOPIC + "/" + mac | ||
_LOGGER.info("Notifying alive to %s", alive_topic) | ||
await mqtt.async_publish(hass, alive_topic, True, 1, retain=True) | ||
|
||
hass.data[DOMAIN][entry.entry_id] = coordinator | ||
hass.config_entries.async_setup_platforms(entry, PLATFORMS) | ||
|
||
return True | ||
|
||
|
||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: | ||
"""Unload a config entry.""" | ||
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS): | ||
hass.data[DOMAIN].pop(entry.entry_id) | ||
|
||
mac = entry.data[MAC] | ||
alive_topic = ALIVE_NODES_TOPIC + "/" + mac | ||
_LOGGER.info("Notifying alive to %s", alive_topic) | ||
await mqtt.async_publish(hass, alive_topic, "", 1, retain=True) | ||
|
||
return unload_ok | ||
|
||
|
||
class BeaconCoordinator(DataUpdateCoordinator[dict[str, Any]]): | ||
"""Class to arrange interaction with MQTT""" | ||
|
||
def __init__(self, hass: HomeAssistant, data) -> None: | ||
self.mac = data[MAC] | ||
self.expiration_time : int | ||
self.default_expiration_time : int = 2 | ||
given_name = data[NAME] if data.__contains__(NAME) else self.mac | ||
self.room_data = dict[str, int]() | ||
self.room_expiration_timers = dict[str, asyncio.TimerHandle]() | ||
self.room = None | ||
|
||
super().__init__(hass, _LOGGER, name=given_name) | ||
|
||
async def _async_update_data(self) -> dict[str, Any]: | ||
"""Update data via library.""" | ||
_LOGGER.error("Room data: %s", str(self.room_data)) | ||
if len(self.room_data) == 0: | ||
self.room = None | ||
else: | ||
self.room = next( | ||
iter( | ||
dict( | ||
sorted( | ||
self.room_data.items(), | ||
key=lambda item: item[1], | ||
reverse=True, | ||
) | ||
) | ||
) | ||
) | ||
return {**{ROOM: self.room}} | ||
|
||
async def subscribe_to_mqtt(self) -> None: | ||
"""Subscribe coordinator to MQTT messages""" | ||
|
||
@callback | ||
async def message_received(self, msg): | ||
"""Handle new MQTT messages.""" | ||
try: | ||
data = MQTT_PAYLOAD(msg.payload) | ||
except vol.MultipleInvalid as error: | ||
_LOGGER.debug("Skipping update because of malformatted data: %s", error) | ||
return | ||
room_topic = msg.topic.split("/")[2] | ||
|
||
await self.schedule_data_expiration(room_topic) | ||
self.room_data[room_topic] = data.get(RSSI) | ||
await self.async_refresh() | ||
|
||
async def schedule_data_expiration(self, room): | ||
"""Start timer for data expiration for certain room""" | ||
if room in self.room_expiration_timers: | ||
self.room_expiration_timers[room].cancel() | ||
loop = asyncio.get_event_loop() | ||
timer = loop.call_later( | ||
(self.expiration_time if self.expiration_time else self.default_expiration_time) * 60, | ||
lambda: asyncio.ensure_future(self.expire_data(room)), | ||
) | ||
self.room_expiration_timers[room] = timer | ||
|
||
async def expire_data(self, room): | ||
"""Set data for certain room expired""" | ||
del self.room_data[room] | ||
del self.room_expiration_timers[room] | ||
await self.async_refresh() | ||
|
||
async def on_expiration_time_changed(self, new_time : int): | ||
"""Respond to expiration time changed by user""" | ||
if new_time is None: | ||
return | ||
self.expiration_time = new_time | ||
for room in self.room_expiration_timers.keys(): | ||
await self.schedule_data_expiration(room) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
"""Common values""" | ||
from homeassistant.helpers.device_registry import format_mac | ||
from .const import DOMAIN | ||
from .__init__ import BeaconCoordinator | ||
from homeassistant.helpers.update_coordinator import CoordinatorEntity | ||
|
||
class BeaconDeviceEntity(CoordinatorEntity[BeaconCoordinator]): | ||
"""Base device class""" | ||
|
||
def __init__(self, coordinator: BeaconCoordinator) -> None: | ||
"""Initialize.""" | ||
super().__init__(coordinator) | ||
self.formatted_mac_address = format_mac(coordinator.mac) | ||
|
||
@property | ||
def device_info(self): | ||
return { | ||
"identifiers": { | ||
# MAC addresses are unique identifiers within a specific domain | ||
(DOMAIN, self.formatted_mac_address) | ||
}, | ||
"name": self.coordinator.name, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
"""Config flow for Format BLE Tracker integration.""" | ||
from __future__ import annotations | ||
|
||
import re | ||
from typing import Any | ||
|
||
import voluptuous as vol | ||
|
||
from homeassistant import config_entries | ||
from homeassistant.data_entry_flow import FlowResult | ||
|
||
from .const import DOMAIN, MAC, MAC_REGEX, UUID_REGEX, NAME | ||
|
||
|
||
STEP_USER_DATA_SCHEMA = vol.Schema( | ||
{ | ||
vol.Required(MAC): str, | ||
vol.Optional(NAME): str, | ||
} | ||
) | ||
|
||
|
||
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): | ||
"""Handle a config flow for Format BLE Tracker.""" | ||
|
||
VERSION = 1 | ||
|
||
async def async_step_user( | ||
self, user_input: dict[str, Any] | None = None | ||
) -> FlowResult: | ||
"""Handle the initial step.""" | ||
if user_input is None: | ||
return self.async_show_form( | ||
step_id="user", data_schema=STEP_USER_DATA_SCHEMA | ||
) | ||
mac = user_input[MAC].strip().upper() | ||
if not re.match(MAC_REGEX, mac) and not re.match(UUID_REGEX, mac): | ||
return self.async_abort(reason="not_id") | ||
await self.async_set_unique_id(mac) | ||
self._abort_if_unique_id_configured() | ||
|
||
given_name = user_input[NAME] if NAME in user_input else mac | ||
|
||
return self.async_create_entry( | ||
title=given_name, data={MAC: mac, NAME: given_name} | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
"""Constants for the Format BLE Tracker integration.""" | ||
|
||
DOMAIN = "format_ble_tracker" | ||
|
||
MAC = "mac" | ||
NAME = "name" | ||
SIXTEENTH_REGEX = "[0-9A-F]" | ||
MAC_REGEX = "^([0-9A-F]{2}[:]){5}([0-9A-F]{2})$" | ||
UUID_REGEX = "^[0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12}$" | ||
|
||
ROOM = "room" | ||
ROOT_TOPIC = "format_ble_tracker" | ||
ALIVE_NODES_TOPIC = ROOT_TOPIC + "/alive" | ||
RSSI = "rssi" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
"""Device tracker implementation""" | ||
from homeassistant.components import device_tracker | ||
from homeassistant.components.device_tracker.config_entry import BaseTrackerEntity | ||
from homeassistant.config_entries import ConfigEntry | ||
from homeassistant.const import STATE_HOME, STATE_NOT_HOME | ||
from homeassistant.core import HomeAssistant, callback | ||
from homeassistant.helpers.entity_platform import AddEntitiesCallback | ||
|
||
from .common import BeaconDeviceEntity | ||
from .__init__ import BeaconCoordinator | ||
from .const import DOMAIN | ||
|
||
|
||
async def async_setup_entry( | ||
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback | ||
) -> None: | ||
"""Add device tracker entities from a config_entry.""" | ||
|
||
coordinator: BeaconCoordinator = hass.data[DOMAIN][entry.entry_id] | ||
async_add_entities([BleDeviceTracker(coordinator)], True) | ||
|
||
|
||
class BleDeviceTracker(BeaconDeviceEntity, BaseTrackerEntity): | ||
"""Define an device tracker entity.""" | ||
|
||
_attr_should_poll = False | ||
|
||
def __init__(self, coordinator: BeaconCoordinator) -> None: | ||
"""Initialize.""" | ||
super().__init__(coordinator) | ||
self._attr_name = coordinator.name + " tracker" | ||
self._attr_unique_id = self.formatted_mac_address + "_tracker" | ||
self.entity_id = f"{device_tracker.DOMAIN}.{self._attr_unique_id}" | ||
|
||
@property | ||
def source_type(self) -> str: | ||
"""Return the source type, eg gps or router, of the device.""" | ||
return "bluetooth_le" | ||
|
||
@property | ||
def state(self) -> str: | ||
"""Return the state of the device.""" | ||
if self.coordinator.room is None: | ||
return STATE_NOT_HOME | ||
return STATE_HOME | ||
|
||
@callback | ||
def _handle_coordinator_update(self) -> None: | ||
"""Handle data update.""" | ||
self.async_write_ha_state() | ||
|
||
async def async_added_to_hass(self) -> None: | ||
"""Subscribe to MQTT events.""" | ||
# await self.coordinator.async_on_entity_added_to_ha() | ||
return await super().async_added_to_hass() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
{ | ||
"domain": "format_ble_tracker", | ||
"name": "Format BLE Tracker", | ||
"version": "0.0.1", | ||
"config_flow": true, | ||
"documentation": "https://www.home-assistant.io/integrations/format_ble_tracker", | ||
"requirements": [], | ||
"ssdp": [], | ||
"zeroconf": [], | ||
"homekit": {}, | ||
"dependencies": ["mqtt"], | ||
"codeowners": ["@formatBCE"], | ||
"iot_class": "local_push" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
"""Expiration setter implementation""" | ||
from homeassistant.components import input_number | ||
from homeassistant.components.number import NumberEntity, NumberMode, RestoreNumber | ||
from homeassistant.config_entries import ConfigEntry | ||
from homeassistant.core import HomeAssistant, callback | ||
from homeassistant.helpers.entity_platform import AddEntitiesCallback | ||
|
||
from .common import BeaconDeviceEntity | ||
from .__init__ import BeaconCoordinator | ||
from .const import DOMAIN | ||
|
||
|
||
async def async_setup_entry( | ||
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback | ||
) -> None: | ||
"""Add sensor entities from a config_entry.""" | ||
|
||
coordinator: BeaconCoordinator = hass.data[DOMAIN][entry.entry_id] | ||
async_add_entities([BleDataExpirationNumber(coordinator)], True) | ||
|
||
|
||
class BleDataExpirationNumber(BeaconDeviceEntity, RestoreNumber, NumberEntity): | ||
"""Define an room sensor entity.""" | ||
|
||
_attr_should_poll = False | ||
|
||
def __init__(self, coordinator: BeaconCoordinator) -> None: | ||
"""Initialize.""" | ||
super().__init__(coordinator) | ||
self._attr_name = coordinator.name + " expiration delay" | ||
self._attr_mode = NumberMode.SLIDER | ||
self._attr_native_unit_of_measurement = "min" | ||
self._attr_native_max_value = 10 | ||
self._attr_native_min_value = 1 | ||
self._attr_native_step = 1 | ||
self._attr_unique_id = self.formatted_mac_address + "_expiration" | ||
self.entity_id = f"{input_number.DOMAIN}.{self._attr_unique_id}" | ||
|
||
async def async_added_to_hass(self): | ||
"""Entity has been added to hass, restoring state""" | ||
restored = await self.async_get_last_number_data() | ||
native_value = 2 if restored is None else restored.native_value | ||
self._attr_native_value = native_value | ||
await self.coordinator.on_expiration_time_changed(native_value) | ||
self.async_write_ha_state() | ||
|
||
async def async_set_native_value(self, value: float) -> None: | ||
"""Update the current value.""" | ||
val = min(10, max(1, int(value))) | ||
self._attr_native_value = val | ||
await self.coordinator.on_expiration_time_changed(val) | ||
|
Oops, something went wrong.