Module imow.common.mowerstate
Expand source code
import logging
from imow.common.actions import IMowActions
logger = logging.getLogger("imow")
class MowerState:
ERROR_MAINSTATE_CODE = 1
def __init__(self, upstream: dict, imow): # Type: api: IMowApi
self.imow = imow
self.stateMessage = {
"short": "",
"long": "",
"legacyMessage": "",
"errorId": "",
"error": False,
}
self.machineError = None
self.machineState = None
self.replace_state(upstream)
def replace_state(self, upstream: dict):
self.__dict__.update(
map(
lambda kv: (kv[0].replace(" ", "_"), kv[1]),
upstream.items(),
)
)
self.update_state_messages()
async def update_setting(self, setting, new_value):
await self.imow.update_setting(
mower_id=self.id, setting=setting, new_value=new_value
)
def update_state_messages(self):
if self.status["mainState"] != self.ERROR_MAINSTATE_CODE:
(
self.stateMessage["short"],
self.stateMessage["long"],
) = self.imow.messages_user.get_status_message(
short_code=self.status["mainState"]
)
# Reset error indication
self.stateMessage["error"] = False
self.machineError = None
self.stateMessage["errorId"] = ""
else:
(
self.stateMessage["short"],
self.stateMessage["long"],
self.stateMessage["errorId"],
self.stateMessage["legacyMessage"],
) = self.imow.messages_user.get_error_message(
short_code=self.status["extraStatus"]
)
# Set error indication to true
self.stateMessage["error"] = True
self.machineError = self.stateMessage["errorId"]
self.generate_machine_state()
def generate_machine_state(self):
if self.status["mainState"] != self.ERROR_MAINSTATE_CODE:
(
state_msg_short,
state_msg_long,
) = self.imow.messages_en.get_status_message(
short_code=self.status["mainState"]
)
else:
(
state_msg_short,
state_msg_long,
errorId,
legacyMessage,
) = self.imow.messages_en.get_error_message(
short_code=self.status["extraStatus"]
)
cleaned_msg = (
state_msg_short.upper().replace(" ", "_").replace(".", "")
)
self.machineState = cleaned_msg
async def update_from_upstream(self):
response = await self.imow.receive_mower_by_id(self.id)
self.replace_state(response.__dict__)
return self
def get_current_task(self) -> str:
return self.stateMessage["short"]
async def get_current_status(self) -> dict:
await self.update_from_upstream()
return self.status
async def get_from_upstream(self): # Type: MowerState
return await self.update_from_upstream()
async def get_statistics(self) -> dict:
return await self.imow.receive_mower_statistics(self.id)
async def get_startpoints(self) -> dict:
return await self.imow.receive_mower_start_points(self.id)
async def get_mower_week_mow_time_in_hours(self) -> dict:
return await self.imow.receive_mower_week_mow_time_in_hours(self.id)
async def intent(
self,
imow_action: IMowActions,
first_action_value_param: any = "",
second_action_value_param: any = "",
test_mode: bool = False,
**kwargs
) -> None:
await self.imow.intent(
imow_action=imow_action,
first_action_value_param=first_action_value_param,
second_action_value_param=second_action_value_param,
mower_external_id=self.externalId,
test_mode=test_mode,
**kwargs
)
accountId: str = {str}
asmEnabled: bool = {bool}
automaticModeEnabled: bool = {bool}
boundryOffset: bool = {int} # 60
childLock: bool = {bool} # False
circumference: bool = {int} # 41
cModuleId: str = {str} # '0234d0fffab1d345'
codePage: bool = {int} # 0
coordinateLatitude: float = {float} # 54.123456
coordinateLongitude: float = {float} # 10.654321
corridorMode: bool = {int} # 0
demoModeEnabled: bool = {bool} # False
deviceType: bool = {int} # 24
deviceTypeDescription: str = {str} # 'RMI 422 PC'
edgeMowingMode: bool = {int} # 2
endOfContract: str = {str} # '2000-01-01T01:59:43+00:00'
energyMode: bool = {int} # 3
externalId: str = {str} # '0000000123456789'
firmwareVersion: str = {str} # '01v013'
gdprAccepted: bool = {bool} # True
gpsProtectionEnabled: bool = {bool} # True
id: str = {str} # '31466'
imsi: str = {str} # '16198732186461'
lastWeatherCheck: str = {str} # '2021-05-15T01:42:25+00:00'
ledStatus: bool = {int} # 11
localTimezoneOffset: bool = {int} # 7182
mappingIntelligentHomeDrive: bool = {int} # 0
mowerImageThumbnailUrl = {
str
} # 'https://app-cdn-appdata001-r-euwe-1b3d32.azureedge.net/device-images/mower-images/31466-2309868077
# -thumb.png'
mowerImageUrl = {
str
} # 'https://app-cdn-appdata001-r-euwe-1b3d32.azureedge.net/device-images/mower-images/31466-2309868077
# -photo.png'
name: str = {str} # 'Mährlin'
protectionLevel: bool = {int} # 1
rainSensorMode: bool = {int} # 1
smartLogic: dict = {dict: 13}
# {'dynamicMowingplan': False, 'mower': None, 'mowingArea': 100, 'mowingAreaInFeet': 1000, 'mowingAreaInMeter': 100,
# 'mowingGrowthAdjustment': 0, 'mowingTime': 60, 'mowingTimeManual': False, 'performedActivityTime': 3,
# 'smartNotifications': False, 'suggestedActivityTime': 135, 'totalActivityActiveTime': 0,
# 'weatherForecastEnabled': True}
softwarePacket: str = {str} # '12.03'
status: dict = {dict: 15}
# {'bladeService': False, 'chargeLevel': 66, 'extraStatus': 0, 'extraStatus1': 0, 'extraStatus2': 0,
# 'extraStatus3': 0, 'extraStatus4': 0, 'extraStatus5': 0, 'lastGeoPositionDate': '2021-05-15T07:12:10+00:00',
# 'lastNoErrorMainState': 7, 'lastSeenDate': '2021-05-13T23:58:31+00:00', 'mainState': 7, 'mower': None,
# 'online': True, 'rainStatus': False}
team = {None} # None
teamable: bool = {bool} # False
timeZone: str = {str} # 'Europe/Berlin'
unitFormat: bool = {int} # 0
version: str = {str} # '3.2.038'
Classes
class MowerState (upstream: dict, imow)
-
Expand source code
class MowerState: ERROR_MAINSTATE_CODE = 1 def __init__(self, upstream: dict, imow): # Type: api: IMowApi self.imow = imow self.stateMessage = { "short": "", "long": "", "legacyMessage": "", "errorId": "", "error": False, } self.machineError = None self.machineState = None self.replace_state(upstream) def replace_state(self, upstream: dict): self.__dict__.update( map( lambda kv: (kv[0].replace(" ", "_"), kv[1]), upstream.items(), ) ) self.update_state_messages() async def update_setting(self, setting, new_value): await self.imow.update_setting( mower_id=self.id, setting=setting, new_value=new_value ) def update_state_messages(self): if self.status["mainState"] != self.ERROR_MAINSTATE_CODE: ( self.stateMessage["short"], self.stateMessage["long"], ) = self.imow.messages_user.get_status_message( short_code=self.status["mainState"] ) # Reset error indication self.stateMessage["error"] = False self.machineError = None self.stateMessage["errorId"] = "" else: ( self.stateMessage["short"], self.stateMessage["long"], self.stateMessage["errorId"], self.stateMessage["legacyMessage"], ) = self.imow.messages_user.get_error_message( short_code=self.status["extraStatus"] ) # Set error indication to true self.stateMessage["error"] = True self.machineError = self.stateMessage["errorId"] self.generate_machine_state() def generate_machine_state(self): if self.status["mainState"] != self.ERROR_MAINSTATE_CODE: ( state_msg_short, state_msg_long, ) = self.imow.messages_en.get_status_message( short_code=self.status["mainState"] ) else: ( state_msg_short, state_msg_long, errorId, legacyMessage, ) = self.imow.messages_en.get_error_message( short_code=self.status["extraStatus"] ) cleaned_msg = ( state_msg_short.upper().replace(" ", "_").replace(".", "") ) self.machineState = cleaned_msg async def update_from_upstream(self): response = await self.imow.receive_mower_by_id(self.id) self.replace_state(response.__dict__) return self def get_current_task(self) -> str: return self.stateMessage["short"] async def get_current_status(self) -> dict: await self.update_from_upstream() return self.status async def get_from_upstream(self): # Type: MowerState return await self.update_from_upstream() async def get_statistics(self) -> dict: return await self.imow.receive_mower_statistics(self.id) async def get_startpoints(self) -> dict: return await self.imow.receive_mower_start_points(self.id) async def get_mower_week_mow_time_in_hours(self) -> dict: return await self.imow.receive_mower_week_mow_time_in_hours(self.id) async def intent( self, imow_action: IMowActions, first_action_value_param: any = "", second_action_value_param: any = "", test_mode: bool = False, **kwargs ) -> None: await self.imow.intent( imow_action=imow_action, first_action_value_param=first_action_value_param, second_action_value_param=second_action_value_param, mower_external_id=self.externalId, test_mode=test_mode, **kwargs ) accountId: str = {str} asmEnabled: bool = {bool} automaticModeEnabled: bool = {bool} boundryOffset: bool = {int} # 60 childLock: bool = {bool} # False circumference: bool = {int} # 41 cModuleId: str = {str} # '0234d0fffab1d345' codePage: bool = {int} # 0 coordinateLatitude: float = {float} # 54.123456 coordinateLongitude: float = {float} # 10.654321 corridorMode: bool = {int} # 0 demoModeEnabled: bool = {bool} # False deviceType: bool = {int} # 24 deviceTypeDescription: str = {str} # 'RMI 422 PC' edgeMowingMode: bool = {int} # 2 endOfContract: str = {str} # '2000-01-01T01:59:43+00:00' energyMode: bool = {int} # 3 externalId: str = {str} # '0000000123456789' firmwareVersion: str = {str} # '01v013' gdprAccepted: bool = {bool} # True gpsProtectionEnabled: bool = {bool} # True id: str = {str} # '31466' imsi: str = {str} # '16198732186461' lastWeatherCheck: str = {str} # '2021-05-15T01:42:25+00:00' ledStatus: bool = {int} # 11 localTimezoneOffset: bool = {int} # 7182 mappingIntelligentHomeDrive: bool = {int} # 0 mowerImageThumbnailUrl = { str } # 'https://app-cdn-appdata001-r-euwe-1b3d32.azureedge.net/device-images/mower-images/31466-2309868077 # -thumb.png' mowerImageUrl = { str } # 'https://app-cdn-appdata001-r-euwe-1b3d32.azureedge.net/device-images/mower-images/31466-2309868077 # -photo.png' name: str = {str} # 'Mährlin' protectionLevel: bool = {int} # 1 rainSensorMode: bool = {int} # 1 smartLogic: dict = {dict: 13} # {'dynamicMowingplan': False, 'mower': None, 'mowingArea': 100, 'mowingAreaInFeet': 1000, 'mowingAreaInMeter': 100, # 'mowingGrowthAdjustment': 0, 'mowingTime': 60, 'mowingTimeManual': False, 'performedActivityTime': 3, # 'smartNotifications': False, 'suggestedActivityTime': 135, 'totalActivityActiveTime': 0, # 'weatherForecastEnabled': True} softwarePacket: str = {str} # '12.03' status: dict = {dict: 15} # {'bladeService': False, 'chargeLevel': 66, 'extraStatus': 0, 'extraStatus1': 0, 'extraStatus2': 0, # 'extraStatus3': 0, 'extraStatus4': 0, 'extraStatus5': 0, 'lastGeoPositionDate': '2021-05-15T07:12:10+00:00', # 'lastNoErrorMainState': 7, 'lastSeenDate': '2021-05-13T23:58:31+00:00', 'mainState': 7, 'mower': None, # 'online': True, 'rainStatus': False} team = {None} # None teamable: bool = {bool} # False timeZone: str = {str} # 'Europe/Berlin' unitFormat: bool = {int} # 0 version: str = {str} # '3.2.038'
Class variables
var ERROR_MAINSTATE_CODE
var accountId : str
var asmEnabled : bool
var automaticModeEnabled : bool
var boundryOffset : bool
var cModuleId : str
var childLock : bool
var circumference : bool
var codePage : bool
var coordinateLatitude : float
var coordinateLongitude : float
var corridorMode : bool
var demoModeEnabled : bool
var deviceType : bool
var deviceTypeDescription : str
var edgeMowingMode : bool
var endOfContract : str
var energyMode : bool
var externalId : str
var firmwareVersion : str
var gdprAccepted : bool
var gpsProtectionEnabled : bool
var id : str
var imsi : str
var lastWeatherCheck : str
var ledStatus : bool
var localTimezoneOffset : bool
var mappingIntelligentHomeDrive : bool
var mowerImageThumbnailUrl
var mowerImageUrl
var name : str
var protectionLevel : bool
var rainSensorMode : bool
var smartLogic : dict
var softwarePacket : str
var status : dict
var team
var teamable : bool
var timeZone : str
var unitFormat : bool
var version : str
Methods
def generate_machine_state(self)
-
Expand source code
def generate_machine_state(self): if self.status["mainState"] != self.ERROR_MAINSTATE_CODE: ( state_msg_short, state_msg_long, ) = self.imow.messages_en.get_status_message( short_code=self.status["mainState"] ) else: ( state_msg_short, state_msg_long, errorId, legacyMessage, ) = self.imow.messages_en.get_error_message( short_code=self.status["extraStatus"] ) cleaned_msg = ( state_msg_short.upper().replace(" ", "_").replace(".", "") ) self.machineState = cleaned_msg
async def get_current_status(self) ‑> dict
-
Expand source code
async def get_current_status(self) -> dict: await self.update_from_upstream() return self.status
def get_current_task(self) ‑> str
-
Expand source code
def get_current_task(self) -> str: return self.stateMessage["short"]
async def get_from_upstream(self)
-
Expand source code
async def get_from_upstream(self): # Type: MowerState return await self.update_from_upstream()
async def get_mower_week_mow_time_in_hours(self) ‑> dict
-
Expand source code
async def get_mower_week_mow_time_in_hours(self) -> dict: return await self.imow.receive_mower_week_mow_time_in_hours(self.id)
async def get_startpoints(self) ‑> dict
-
Expand source code
async def get_startpoints(self) -> dict: return await self.imow.receive_mower_start_points(self.id)
async def get_statistics(self) ‑> dict
-
Expand source code
async def get_statistics(self) -> dict: return await self.imow.receive_mower_statistics(self.id)
async def intent(self, imow_action: IMowActions, first_action_value_param:
= '', second_action_value_param: = '', test_mode: bool = False, **kwargs) ‑> None -
Expand source code
async def intent( self, imow_action: IMowActions, first_action_value_param: any = "", second_action_value_param: any = "", test_mode: bool = False, **kwargs ) -> None: await self.imow.intent( imow_action=imow_action, first_action_value_param=first_action_value_param, second_action_value_param=second_action_value_param, mower_external_id=self.externalId, test_mode=test_mode, **kwargs )
def replace_state(self, upstream: dict)
-
Expand source code
def replace_state(self, upstream: dict): self.__dict__.update( map( lambda kv: (kv[0].replace(" ", "_"), kv[1]), upstream.items(), ) ) self.update_state_messages()
async def update_from_upstream(self)
-
Expand source code
async def update_from_upstream(self): response = await self.imow.receive_mower_by_id(self.id) self.replace_state(response.__dict__) return self
async def update_setting(self, setting, new_value)
-
Expand source code
async def update_setting(self, setting, new_value): await self.imow.update_setting( mower_id=self.id, setting=setting, new_value=new_value )
def update_state_messages(self)
-
Expand source code
def update_state_messages(self): if self.status["mainState"] != self.ERROR_MAINSTATE_CODE: ( self.stateMessage["short"], self.stateMessage["long"], ) = self.imow.messages_user.get_status_message( short_code=self.status["mainState"] ) # Reset error indication self.stateMessage["error"] = False self.machineError = None self.stateMessage["errorId"] = "" else: ( self.stateMessage["short"], self.stateMessage["long"], self.stateMessage["errorId"], self.stateMessage["legacyMessage"], ) = self.imow.messages_user.get_error_message( short_code=self.status["extraStatus"] ) # Set error indication to true self.stateMessage["error"] = True self.machineError = self.stateMessage["errorId"] self.generate_machine_state()