Skip to content

Commit

Permalink
Fix linting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
LaStrada committed Jan 1, 2025
1 parent 9796968 commit cf2c0f4
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 130 deletions.
12 changes: 6 additions & 6 deletions airthings_ble/device_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def product_name(self) -> str:
return "Wave Plus"
if self == AirthingsDeviceType.WAVE_RADON:
return "Wave Radon"
if (
self == AirthingsDeviceType.WAVE_ENHANCE_EU
or self == AirthingsDeviceType.WAVE_ENHANCE_US
if self in (
AirthingsDeviceType.WAVE_ENHANCE_EU,
AirthingsDeviceType.WAVE_ENHANCE_US
):
return "Wave Enhance"
return "Unknown"
Expand Down Expand Up @@ -126,9 +126,9 @@ def _interpolate(

def need_firmware_upgrade(self, version: str) -> bool:
"""Check if the device needs an update."""
if (
self == AirthingsDeviceType.WAVE_ENHANCE_EU
or self == AirthingsDeviceType.WAVE_ENHANCE_US
if self in (
AirthingsDeviceType.WAVE_ENHANCE_EU,
AirthingsDeviceType.WAVE_ENHANCE_US
):
return self._wave_enhance_need_firmware_upgrade(version)

Expand Down
254 changes: 130 additions & 124 deletions airthings_ble/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ def friendly_name(self) -> str:
return f"Airthings {self.model.product_name}"


class AirthingsFirmware:
class AirthingsFirmware: # pylint: disable=too-few-public-methods
"""Firmware information for the Airthings device."""
need_fw_upgrade = False
current_firmware = ""
needed_firmware = ""
Expand Down Expand Up @@ -640,12 +641,7 @@ async def _get_service_characteristics(
) -> None:
svcs = client.services
sensors = device.sensors
for service in svcs:
is_tern = (
device.model == AirthingsDeviceType.WAVE_ENHANCE_EU
or device.model == AirthingsDeviceType.WAVE_ENHANCE_US
)

for service in svcs: # pylint: disable=too-many-nested-blocks
if (
(
str(COMMAND_UUID_WAVE_ENHANCE)
Expand All @@ -655,160 +651,170 @@ async def _get_service_characteristics(
str(COMMAND_UUID_WAVE_ENHANCE_NOTIFY)
in (str(x.uuid) for x in service.characteristics)
)
and is_tern
and device.model in (
AirthingsDeviceType.WAVE_ENHANCE_EU,
AirthingsDeviceType.WAVE_ENHANCE_US,
)
):
if self.device_info.model.need_firmware_upgrade(
self.device_info.sw_version
):
self.logger.warning(
"The firmware for this Wave Enhance is not up to date, "
"please update to 2.6.1 or newer using the Airthings app."
)
device.firmware = AirthingsFirmware(
need_fw_upgrade=True,
current_firmware=device.sw_version,
needed_firmware="2.6.1",
await self._wave_enhance_sensor_data(client, device, sensors, service)
else:
await self._wave_sensor_data(client, device, sensors, service)

async def _wave_sensor_data(self, client, device, sensors, service):
for characteristic in service.characteristics:
uuid = characteristic.uuid
uuid_str = str(uuid)
if (
uuid in sensors_characteristics_uuid_str
and uuid_str in sensor_decoders
):
try:
data = await client.read_gatt_char(characteristic)
except BleakError as err:
self.logger.debug(
"Get service characteristics exception: %s", err
)
return
continue

decoder = command_decoders[str(COMMAND_UUID_WAVE_ENHANCE)]
sensor_data = sensor_decoders[uuid_str](data)

command_data_receiver = decoder.make_data_receiver()
# Skipping for now
if "date_time" in sensor_data:
sensor_data.pop("date_time")

atom_write = service.get_characteristic(COMMAND_UUID_WAVE_ENHANCE)
atom_notify = service.get_characteristic(
COMMAND_UUID_WAVE_ENHANCE_NOTIFY
)
sensors.update(sensor_data)

# Set up the notification handlers
await client.start_notify(atom_notify, command_data_receiver)
# Manage radon values
if (d := sensor_data.get("radon_1day_avg")) is not None:
sensors["radon_1day_level"] = get_radon_level(float(d))
if not self.is_metric:
sensors["radon_1day_avg"] = (
float(d) * BQ_TO_PCI_MULTIPLIER
)
if (d := sensor_data.get("radon_longterm_avg")) is not None:
sensors["radon_longterm_level"] = get_radon_level(float(d))
if not self.is_metric:
sensors["radon_longterm_avg"] = (
float(d) * BQ_TO_PCI_MULTIPLIER
)

if uuid_str in command_decoders:
decoder = command_decoders[uuid_str]
command_data_receiver = decoder.make_data_receiver()

# Set up the notification handlers
await client.start_notify(characteristic, command_data_receiver)
# send command to this 'indicate' characteristic
await client.write_gatt_char(atom_write, bytearray(decoder.cmd))
await client.write_gatt_char(
characteristic, bytearray(decoder.cmd)
)
# Wait for up to one second to see if a callback comes in.
try:
await command_data_receiver.wait_for_message(5)
except asyncio.TimeoutError:
self.logger.warning("Timeout getting command data.")

command_sensor_data = decoder.decode_data(
logger=self.logger,
raw_data=command_data_receiver.message,
logger=self.logger, raw_data=command_data_receiver.message
)

if command_sensor_data is not None:
new_values: dict[str, float | str | None] = {}

if (bat_data := command_sensor_data.get("BAT")) is not None:
if (
bat_data := command_sensor_data.get("battery")
) is not None:
new_values["battery"] = device.model.battery_percentage(
float(bat_data)
)

if (lux := command_sensor_data.get("LUX")) is not None:
new_values["lux"] = lux
if illuminance := command_sensor_data.get("illuminance"):
new_values["illuminance"] = illuminance

sensors.update(new_values)

# Stop notification handler
await client.stop_notify(characteristic)

if (co2 := command_sensor_data.get("CO2")) is not None:
new_values["co2"] = co2
async def _wave_enhance_sensor_data(
self, client, device, sensors, service
) -> None:
if self.device_info.model.need_firmware_upgrade(
self.device_info.sw_version
):
self.logger.warning(
"The firmware for this Wave Enhance is not up to date, "
"please update to 2.6.1 or newer using the Airthings app."
)
device.firmware = AirthingsFirmware(
need_fw_upgrade=True,
current_firmware=device.sw_version,
needed_firmware="2.6.1",
)
return

if (voc := command_sensor_data.get("VOC")) is not None:
new_values["voc"] = voc
decoder = command_decoders[str(COMMAND_UUID_WAVE_ENHANCE)]

if (hum := command_sensor_data.get("HUM")) is not None:
new_values["humidity"] = hum / 100.0
command_data_receiver = decoder.make_data_receiver()

if (temperature := command_sensor_data.get("TMP")) is not None:
# Temperature reported as kelvin
new_values["temperature"] = round(
temperature / 100.0 - 273.15, 2
)
atom_write = service.get_characteristic(COMMAND_UUID_WAVE_ENHANCE)
atom_notify = service.get_characteristic(
COMMAND_UUID_WAVE_ENHANCE_NOTIFY
)

if (noise := command_sensor_data.get("NOI")) is not None:
new_values["noise"] = noise
# Set up the notification handlers
await client.start_notify(atom_notify, command_data_receiver)

if (pressure := command_sensor_data.get("PRS")) is not None:
new_values["pressure"] = pressure / (64 * 100)
# send command to this 'indicate' characteristic
await client.write_gatt_char(atom_write, bytearray(decoder.cmd))
# Wait for up to one second to see if a callback comes in.
try:
await command_data_receiver.wait_for_message(5)
except asyncio.TimeoutError:
self.logger.warning("Timeout getting command data.")

self.logger.debug("Sensor values: %s", new_values)
command_sensor_data = decoder.decode_data(
logger=self.logger,
raw_data=command_data_receiver.message,
)

sensors.update(new_values)
if command_sensor_data is not None:
new_values: dict[str, float | str | None] = {}

# Stop notification handler
await client.stop_notify(atom_notify)
if (bat_data := command_sensor_data.get("BAT")) is not None:
new_values["battery"] = device.model.battery_percentage(
float(bat_data)
)

else:
for characteristic in service.characteristics:
uuid = characteristic.uuid
uuid_str = str(uuid)
if (
uuid in sensors_characteristics_uuid_str
and uuid_str in sensor_decoders
):
try:
data = await client.read_gatt_char(characteristic)
except BleakError as err:
self.logger.debug(
"Get service characteristics exception: %s", err
)
continue

sensor_data = sensor_decoders[uuid_str](data)

# Skipping for now
if "date_time" in sensor_data:
sensor_data.pop("date_time")

sensors.update(sensor_data)

# Manage radon values
if (d := sensor_data.get("radon_1day_avg")) is not None:
sensors["radon_1day_level"] = get_radon_level(float(d))
if not self.is_metric:
sensors["radon_1day_avg"] = (
float(d) * BQ_TO_PCI_MULTIPLIER
)
if (d := sensor_data.get("radon_longterm_avg")) is not None:
sensors["radon_longterm_level"] = get_radon_level(float(d))
if not self.is_metric:
sensors["radon_longterm_avg"] = (
float(d) * BQ_TO_PCI_MULTIPLIER
)

if uuid_str in command_decoders:
decoder = command_decoders[uuid_str]
command_data_receiver = decoder.make_data_receiver()

# Set up the notification handlers
await client.start_notify(characteristic, command_data_receiver)
# send command to this 'indicate' characteristic
await client.write_gatt_char(
characteristic, bytearray(decoder.cmd)
)
# Wait for up to one second to see if a callback comes in.
try:
await command_data_receiver.wait_for_message(5)
except asyncio.TimeoutError:
self.logger.warning("Timeout getting command data.")

command_sensor_data = decoder.decode_data(
logger=self.logger, raw_data=command_data_receiver.message
)
if command_sensor_data is not None:
new_values: dict[str, float | str | None] = {}
if (lux := command_sensor_data.get("LUX")) is not None:
new_values["lux"] = lux

if (co2 := command_sensor_data.get("CO2")) is not None:
new_values["co2"] = co2

if (voc := command_sensor_data.get("VOC")) is not None:
new_values["voc"] = voc

if (hum := command_sensor_data.get("HUM")) is not None:
new_values["humidity"] = hum / 100.0

if (temperature := command_sensor_data.get("TMP")) is not None:
# Temperature reported as kelvin
new_values["temperature"] = round(
temperature / 100.0 - 273.15, 2
)

if (noise := command_sensor_data.get("NOI")) is not None:
new_values["noise"] = noise

if (
bat_data := command_sensor_data.get("battery")
) is not None:
new_values["battery"] = device.model.battery_percentage(
float(bat_data)
)
if (pressure := command_sensor_data.get("PRS")) is not None:
new_values["pressure"] = pressure / (64 * 100)

if illuminance := command_sensor_data.get("illuminance"):
new_values["illuminance"] = illuminance
self.logger.debug("Sensor values: %s", new_values)

sensors.update(new_values)
sensors.update(new_values)

# Stop notification handler
await client.stop_notify(characteristic)
# Stop notification handler
await client.stop_notify(atom_notify)

def _handle_disconnect(
self, disconnect_future: asyncio.Future[bool], client: BleakClient
Expand Down

0 comments on commit cf2c0f4

Please sign in to comment.