Skip to content

Commit

Permalink
Add tests and fix lint issues
Browse files Browse the repository at this point in the history
  • Loading branch information
LaStrada committed Jan 1, 2025
1 parent cf2c0f4 commit 459c403
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 51 deletions.
4 changes: 2 additions & 2 deletions airthings_ble/device_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def product_name(self) -> str:
return "Wave Radon"
if self in (
AirthingsDeviceType.WAVE_ENHANCE_EU,
AirthingsDeviceType.WAVE_ENHANCE_US
AirthingsDeviceType.WAVE_ENHANCE_US,
):
return "Wave Enhance"
return "Unknown"
Expand Down Expand Up @@ -128,7 +128,7 @@ def need_firmware_upgrade(self, version: str) -> bool:
"""Check if the device needs an update."""
if self in (
AirthingsDeviceType.WAVE_ENHANCE_EU,
AirthingsDeviceType.WAVE_ENHANCE_US
AirthingsDeviceType.WAVE_ENHANCE_US,
):
return self._wave_enhance_need_firmware_upgrade(version)

Expand Down
69 changes: 24 additions & 45 deletions airthings_ble/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ def friendly_name(self) -> str:

class AirthingsFirmware: # pylint: disable=too-few-public-methods
"""Firmware information for the Airthings device."""

need_fw_upgrade = False
current_firmware = ""
needed_firmware = ""
Expand Down Expand Up @@ -651,7 +652,8 @@ async def _get_service_characteristics(
str(COMMAND_UUID_WAVE_ENHANCE_NOTIFY)
in (str(x.uuid) for x in service.characteristics)
)
and device.model in (
and device.model
in (
AirthingsDeviceType.WAVE_ENHANCE_EU,
AirthingsDeviceType.WAVE_ENHANCE_US,
)
Expand All @@ -664,16 +666,11 @@ async def _wave_sensor_data(self, client, device, sensors, service):
for characteristic in service.characteristics:
uuid = characteristic.uuid
uuid_str = str(uuid)
if (
uuid in sensors_characteristics_uuid_str
and uuid_str in sensor_decoders
):
if uuid in sensors_characteristics_uuid_str and uuid_str in sensor_decoders:
try:
data = await client.read_gatt_char(characteristic)
except BleakError as err:
self.logger.debug(
"Get service characteristics exception: %s", err
)
self.logger.debug("Get service characteristics exception: %s", err)
continue

sensor_data = sensor_decoders[uuid_str](data)
Expand All @@ -688,15 +685,11 @@ async def _wave_sensor_data(self, client, device, sensors, service):
if (d := sensor_data.get("radon_1day_avg")) is not None:
sensors["radon_1day_level"] = get_radon_level(float(d))
if not self.is_metric:
sensors["radon_1day_avg"] = (
float(d) * BQ_TO_PCI_MULTIPLIER
)
sensors["radon_1day_avg"] = float(d) * BQ_TO_PCI_MULTIPLIER
if (d := sensor_data.get("radon_longterm_avg")) is not None:
sensors["radon_longterm_level"] = get_radon_level(float(d))
if not self.is_metric:
sensors["radon_longterm_avg"] = (
float(d) * BQ_TO_PCI_MULTIPLIER
)
sensors["radon_longterm_avg"] = float(d) * BQ_TO_PCI_MULTIPLIER

if uuid_str in command_decoders:
decoder = command_decoders[uuid_str]
Expand All @@ -705,9 +698,7 @@ async def _wave_sensor_data(self, client, device, sensors, service):
# Set up the notification handlers
await client.start_notify(characteristic, command_data_receiver)
# send command to this 'indicate' characteristic
await client.write_gatt_char(
characteristic, bytearray(decoder.cmd)
)
await client.write_gatt_char(characteristic, bytearray(decoder.cmd))
# Wait for up to one second to see if a callback comes in.
try:
await command_data_receiver.wait_for_message(5)
Expand All @@ -720,9 +711,7 @@ async def _wave_sensor_data(self, client, device, sensors, service):
if command_sensor_data is not None:
new_values: dict[str, float | str | None] = {}

if (
bat_data := command_sensor_data.get("battery")
) is not None:
if (bat_data := command_sensor_data.get("battery")) is not None:
new_values["battery"] = device.model.battery_percentage(
float(bat_data)
)
Expand All @@ -735,31 +724,25 @@ async def _wave_sensor_data(self, client, device, sensors, service):
# Stop notification handler
await client.stop_notify(characteristic)

async def _wave_enhance_sensor_data(
self, client, device, sensors, service
) -> None:
if self.device_info.model.need_firmware_upgrade(
self.device_info.sw_version
):
async def _wave_enhance_sensor_data(self, client, device, sensors, service) -> None:
if self.device_info.model.need_firmware_upgrade(self.device_info.sw_version):
self.logger.warning(
"The firmware for this Wave Enhance is not up to date, "
"please update to 2.6.1 or newer using the Airthings app."
)
"The firmware for this Wave Enhance is not up to date, "
"please update to 2.6.1 or newer using the Airthings app."
)
device.firmware = AirthingsFirmware(
need_fw_upgrade=True,
current_firmware=device.sw_version,
needed_firmware="2.6.1",
)
need_fw_upgrade=True,
current_firmware=device.sw_version,
needed_firmware="2.6.1",
)
return

decoder = command_decoders[str(COMMAND_UUID_WAVE_ENHANCE)]

command_data_receiver = decoder.make_data_receiver()

atom_write = service.get_characteristic(COMMAND_UUID_WAVE_ENHANCE)
atom_notify = service.get_characteristic(
COMMAND_UUID_WAVE_ENHANCE_NOTIFY
)
atom_notify = service.get_characteristic(COMMAND_UUID_WAVE_ENHANCE_NOTIFY)

# Set up the notification handlers
await client.start_notify(atom_notify, command_data_receiver)
Expand All @@ -773,17 +756,15 @@ async def _wave_enhance_sensor_data(
self.logger.warning("Timeout getting command data.")

command_sensor_data = decoder.decode_data(
logger=self.logger,
raw_data=command_data_receiver.message,
)
logger=self.logger,
raw_data=command_data_receiver.message,
)

if command_sensor_data is not None:
new_values: dict[str, float | str | None] = {}

if (bat_data := command_sensor_data.get("BAT")) is not None:
new_values["battery"] = device.model.battery_percentage(
float(bat_data)
)
new_values["battery"] = device.model.battery_percentage(float(bat_data))

if (lux := command_sensor_data.get("LUX")) is not None:
new_values["lux"] = lux
Expand All @@ -799,9 +780,7 @@ async def _wave_enhance_sensor_data(

if (temperature := command_sensor_data.get("TMP")) is not None:
# Temperature reported as kelvin
new_values["temperature"] = round(
temperature / 100.0 - 273.15, 2
)
new_values["temperature"] = round(temperature / 100.0 - 273.15, 2)

if (noise := command_sensor_data.get("NOI")) is not None:
new_values["noise"] = noise
Expand Down
9 changes: 7 additions & 2 deletions airthings_ble/wave_enhance/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@ class WaveEnhanceRequest:
url: WaveEnhanceRequestPath
random_bytes: bytes

def __init__(self, url: WaveEnhanceRequestPath):
def __init__(
self, url: WaveEnhanceRequestPath, random_bytes: bytes | None = None
) -> None:
self.url = url
self.random_bytes = os.urandom(2)
if random_bytes is not None:
self.random_bytes = random_bytes
else:
self.random_bytes = os.urandom(2)

def as_bytes(self) -> bytes:
"""Get request as bytes"""
Expand Down
6 changes: 4 additions & 2 deletions tests/test_device_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ def test_device_type():
assert AirthingsDeviceType.from_raw_value("2930") == AirthingsDeviceType.WAVE_PLUS
assert AirthingsDeviceType.from_raw_value("2950") == AirthingsDeviceType.WAVE_RADON
assert (
AirthingsDeviceType.from_raw_value("3210") == AirthingsDeviceType.WAVE_ENHANCE_EU
AirthingsDeviceType.from_raw_value("3210")
== AirthingsDeviceType.WAVE_ENHANCE_EU
)
assert (
AirthingsDeviceType.from_raw_value("3220") == AirthingsDeviceType.WAVE_ENHANCE_US
AirthingsDeviceType.from_raw_value("3220")
== AirthingsDeviceType.WAVE_ENHANCE_US
)

unknown_device = AirthingsDeviceType.from_raw_value("1234")
Expand Down
50 changes: 50 additions & 0 deletions tests/test_wave_enhance_request_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import logging
from airthings_ble.wave_enhance.request import (
WaveEnhanceRequest,
WaveEnhanceRequestPath,
WaveEnhanceResponse,
)

_LOGGER = logging.getLogger(__name__)


def test_wave_enhance_request():
"""Test the Wave Enhance request."""
request = WaveEnhanceRequest(url=WaveEnhanceRequestPath.LATEST_VALUES)
assert request.url == WaveEnhanceRequestPath.LATEST_VALUES
assert len(request.random_bytes) == 2


def test_wave_enhance_request_response():
"""Test the Wave Enhance request."""
request = WaveEnhanceRequest(
url=WaveEnhanceRequestPath.LATEST_VALUES, random_bytes=bytes.fromhex("E473")
)

assert request.as_bytes()[0:2] == bytes.fromhex("0301")
assert request.as_bytes()[2:4] == request.random_bytes
assert request.as_bytes()[4:8] == bytes.fromhex("81A1006D")
assert request.as_bytes()[8:] == request.url.as_bytes()

response = WaveEnhanceResponse(
logger=_LOGGER,
response=bytes.fromhex(
"1001000345e47381a2006d32393939392f302f333130313202583ea9634e4f49"
+ "182763544d501972f06348554d190d2f63434f321902dc63564f43190115634c5"
+ "55801635052531a005f364663424154190b346354494d1876"
),
random_bytes=request.random_bytes,
path=WaveEnhanceRequestPath.LATEST_VALUES,
)

sensor_data = response.parse()

assert sensor_data["TMP"] == 29424
assert sensor_data["HUM"] == 3375
assert sensor_data["CO2"] == 732
assert sensor_data["VOC"] == 277
assert sensor_data["LUX"] == 1
assert sensor_data["PRS"] == 6239814
assert sensor_data["BAT"] == 2868
assert sensor_data["TIM"] == 118
assert sensor_data["NOI"] == 39

0 comments on commit 459c403

Please sign in to comment.