Module imow.api
Expand source code
from __future__ import annotations
import asyncio
import json
import logging
import sys
from datetime import datetime, timedelta
from typing import Tuple, Union, List, Any
from urllib.parse import quote
import aiohttp
from aiohttp import ClientSession, ClientResponseError, ClientResponse
from bs4 import BeautifulSoup
from furl import furl
from imow.common.actions import IMowActions
from imow.common.consts import IMOW_OAUTH_URI, IMOW_API_URI
from imow.common.exceptions import (
LoginError,
ApiMaintenanceError,
LanguageNotFoundError,
)
from imow.common.messages import Messages
from imow.common.mowerstate import MowerState
from imow.common.package_descriptions import (
python_major,
python_minor,
package_name,
)
logger = logging.getLogger("imow")
try:
assert sys.version_info >= (int(python_major), int(python_minor))
except AssertionError:
raise RuntimeError(
f"{package_name!r} requires Python {python_major}.{python_minor}+ (You have Python {sys.version})"
)
if (
sys.version_info[0] == 3
and sys.version_info[1] >= 8
and sys.platform.startswith("win")
):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
def validate_and_fix_datetime(value) -> str:
"""
Try to convert and validate the given string from "%Y-%m-%d %H:%M" or "%Y-%m-%d %H:%M:%S into a datetime object
and give the needed "%Y-%m-%d %H:%M" string back.
:param value: the string tobe checked :return: the correctly formated string
"""
try:
datetime_object = datetime.strptime(value, "%Y-%m-%d %H:%M")
return datetime_object.strftime("%Y-%m-%d %H:%M")
except ValueError as ve:
logger.warning(
f" Try fixing given time format because {ve} in {value}"
)
try:
datetime_object = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
return datetime_object.strftime("%Y-%m-%d %H:%M")
except ValueError as ve2:
raise ValueError(f'Unsupported "time" argument: {value} -> {ve2}')
class IMowApi:
def __init__(
self,
email: str = None,
password: str = None,
token: str = None,
aiohttp_session: ClientSession = None,
lang: str = "en",
) -> None:
self.http_session: ClientSession = aiohttp_session
self.csrf_token: str = ""
self.requestId: str = ""
self.access_token: str = token
self.token_expires: datetime = None
self.api_email: str = email
self.api_password: str = password
self.lang = lang
self.messages_user = None
self.messages_en = None
async def close(self):
"""Cleanup the aiohttp Session"""
await asyncio.sleep(0.250)
await self.http_session.close()
async def check_api_maintenance(self) -> None:
url = "https://app-api-maintenance-r-euwe-4bf2d8.azurewebsites.net/maintenance/"
headers = {
"Authorization": "",
}
response = await self.api_request(url, "GET", headers=headers)
status = json.loads(await response.text())
logger.debug(status)
if status["serverDisrupted"] or status["serverDown"]:
msg = (
f"iMow API is under Maintenance -> "
f'serverDisrupted: {status["serverDisrupted"]}, serverDown: {status["serverDown"]}, '
f'affectedTill {status["affectedTill"]}'
)
raise ApiMaintenanceError(msg)
async def get_token(
self,
email: str = "",
password: str = "",
force_reauth=False,
return_expire_time=False,
) -> Union[Tuple[str, datetime], str]:
"""
look for a token, if present, return. Else authenticate and store new token
:param return_expire_time:
:param email: stihl webapp login email non-url-encoded
:param password: stihl webapp login password
:param force_reauth: Force a re-authentication with username and password
:return: tuple, the access token and a datetime object containing the expire date
"""
if not self.access_token or force_reauth:
if email and password:
self.api_password = password
self.api_email = email
if force_reauth:
await self.api_logout()
self.csrf_token = ""
self.requestId = ""
self.access_token: str = ""
self.token_expires: datetime = None
if not self.api_email and not self.api_password:
raise LoginError(
"Got no credentials to authenticate, please provide"
)
await self.__authenticate(self.api_email, self.api_password)
logger.debug("Get Token: Re-Authenticate")
await self.validate_token()
if return_expire_time:
return self.access_token, self.token_expires
else:
return self.access_token
async def api_logout(self):
if not self.http_session or self.http_session.closed:
self.http_session = aiohttp.ClientSession(raise_for_status=True)
async with self.http_session.post(
"https://oauth2.imow.stihl.com/authentication/logout/",
data={
"csrf-token": self.csrf_token,
"logoutUrl": "https://app.imow.stihl.com",
"clientId": "9526273B-1477-47C6-801C-4356F58EF883",
"cancelUrl": "https://app.imow.stihl.com",
},
) as resp:
await resp.read()
self.http_session.cookie_jar.clear_domain("https://app.imow.stihl.com")
self.http_session.cookie_jar.clear_domain(
"https://oauth2.imow.stihl.com/"
)
async def validate_token(self, explicit_token: str = None) -> bool:
old_token = None
if explicit_token:
# save old instance token and place temp token for validation
old_token = self.access_token
self.access_token = explicit_token
await self.receive_mowers()
if explicit_token:
# Reset instance token
self.access_token = old_token
return True
async def __authenticate(
self, email: str, password: str
) -> tuple[Any, datetime, ClientResponse]:
"""
try the authentication request with fetched csrf and requestId payload
:param email: stihl webapp login email non-url-encoded
:param password: stihl webapp login password
:return: the newly created access token, and expire time besides the legacy response
"""
await self.__fetch_new_csrf_token_and_request_id()
url = f"{IMOW_OAUTH_URI}/authentication/authenticate/?lang={self.lang}"
encoded_mail = quote(email)
encoded_password = quote(password)
payload = (
f"mail={encoded_mail}&password={encoded_password}"
f"&csrf-token={self.csrf_token}&requestId={self.requestId} "
)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
}
response = await self.api_request(
url, "POST", payload=payload, headers=headers
)
response_url_query_args = furl(response.real_url).fragment.args
if "access_token" not in response_url_query_args:
raise LoginError(
"STIHL iMow did not return an access_token, check your credentials"
)
self.access_token = response_url_query_args["access_token"]
self.token_expires = datetime.now() + timedelta(
seconds=int(response_url_query_args["expires_in"])
)
return self.access_token, self.token_expires, response
async def __fetch_new_csrf_token_and_request_id(
self,
) -> tuple[str | list[str] | None, str | list[str] | None]:
"""
Fetch a new csrf_token and requestId to do the authentication as expected by the api
csrf_token and requestId are used as payload within authentication
"""
# URL needs whole redirect query parameter
url = (
f"{IMOW_OAUTH_URI}/authentication/?lang=de_DE&authorizationRedirectUrl=https%3A%2F%2Foauth2"
".imow.stihl.com%2Fauthorization%2F%3Fresponse_type%3Dtoken%26client_id%3D9526273B-1477-47C6-801C"
"-4356F58EF883%26redirect_uri%3Dhttps%253A%252F%252Fapp.imow.stihl.com%252F%2523%252Fauthorize%26state"
)
response = await self.api_request(url, "GET")
soup = BeautifulSoup(await response.text(), "html.parser")
try:
upstream_csrf_token = soup.find(
"input", {"name": "csrf-token"}
).get("value")
upstream_request_id = soup.find(
"input", {"name": "requestId"}
).get("value")
except AttributeError:
raise ProcessLookupError(
"Did not find necessary csrf token and/or request id in html source"
)
self.csrf_token = upstream_csrf_token
self.requestId = upstream_request_id
logger.debug("CSRF: new token and request id <redacted>")
return self.csrf_token, self.requestId
async def fetch_messages(self):
try:
url_en = (
"https://app.imow.stihl.com/assets/i18n/animations/en.json"
)
async with self.http_session.request("GET", url_en) as response_en:
i18n_en = json.loads(await response_en.text())
self.messages_en = Messages(i18n_en)
if self.lang != "en":
url_user = f"https://app.imow.stihl.com/assets/i18n/animations/{self.lang}.json"
async with self.http_session.request(
"GET", url_user
) as response_user:
i18n_user = json.loads(await response_user.text())
self.messages_user = Messages(i18n_user)
else:
self.messages_user = self.messages_en
except ClientResponseError as e:
if e.status == 404:
raise LanguageNotFoundError(
f"Language-File '{self.lang}.json' not found on imow upstream ("
f"https://app.imow.stihl.com/assets/i18n/animations/{self.lang}.json)"
)
async def api_request(
self, url, method, payload=None, headers=None
) -> aiohttp.ClientResponse:
"""
Do a standardized request against the stihl imow webapi, with predefined headers
:param url: The target URL
:param method: The Method to use
:param payload: optional payload
:param headers: optional update headers
:return: the aiohttp.ClientResponse
"""
if not self.http_session or self.http_session.closed:
self.http_session = aiohttp.ClientSession(raise_for_status=True)
if not self.messages_en:
await self.fetch_messages()
if (
self.token_expires
and (self.token_expires - datetime.now()).days <= 1
):
logger.info(
"Fetching new access_token because old one expires in less than 1 day"
)
await self.get_token(force_reauth=True)
if not payload:
payload = {}
headers_obj = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:88.0) Gecko/20100101 Firefox/88.0",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de,en-US;q=0.7,en;q=0.3",
"Authorization": f'Bearer {self.access_token if self.access_token else ""}',
"Origin": "https://app.imow.stihl.com",
"DNT": "1",
"Connection": "keep-alive",
"Referer": "https://app.imow.stihl.com/",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
"TE": "Trailers",
"Content-Type": "application/json",
}
if headers:
headers_obj.update(headers)
try:
async with self.http_session.request(
method, url, headers=headers_obj, data=payload
) as response:
await response.read()
response.raise_for_status()
return response
except ClientResponseError as e:
if e.status == 500:
await self.check_api_maintenance()
raise e
async def intent(
self,
imow_action: IMowActions,
mower_name: str = "",
mower_id: str = "",
mower_external_id: str = "",
first_action_value_param: any = "",
second_action_value_param: any = "",
test_mode: bool = False,
**kwargs,
) -> aiohttp.ClientResponse:
"""
Intent to do a action. This seems to create a job object upstream. The action object contains an action Enum,
the action Value is <MowerExternalId> or <MowerExternalId,DurationInMunitesDividedBy10,StartPoint> if
startMowing is chosen
:param imow_action: Anything from imow.common.actions
:param mower_name: sth to identify which mower is used
:param mower_id: sth to identify which mower is used
:param mower_external_id:
necessary identifier for the mowers for actions.
This is looked up, if only mower_name or mower_id is provided
:param first_action_value_param: first argument passed into the action call request to the api. Can be one of the following contents:
A duration: minutes of intented mowing. Used by START_MOWING_FROM_POINT. Defaults to '30' minutes.
A starttime: a datetime when to start mowing. I.e. '2023-08-12 20:50' used by START_MOWING
:param second_action_value_param: second argument passed into the action call request to the api. Can be one of the following contents:
A startpoint: from which the mowing shall start. Used by START_MOWING_FROM_POINT. Defaults to '0'.
An endtime: a datetime when to stop mowing. I.e. '2023-08-12 20:50' used by START_MOWING
:param test_mode: Do not issue the request to the server
:return:
"""
if test_mode:
logger.warning("TEST_MODE: Request will not be issued to server.")
if not mower_external_id and not mower_id and not mower_name:
raise AttributeError(
"Need some mower to work on. Please specify mower_[name|id|action_id]"
)
if not mower_external_id and mower_name:
mower_external_id = await self.get_mower_action_id_from_name(
mower_name
)
if not mower_external_id and mower_id:
mower_external_id = await self.get_mower_action_id_from_id(
mower_id
)
if len(mower_external_id) < 16:
raise AttributeError(
f"Invalid mower_action_id, need exactly 16 chars, got {len(mower_external_id)} in {mower_external_id}"
)
url = f"{IMOW_API_URI}/mower-actions/"
given_kwargs = kwargs.items()
if len(given_kwargs) > 0:
logger.debug(
"Translating given intent **kwargs to action_value_param"
)
for key, value in given_kwargs:
logger.debug(" {0} = {1}".format(key, value))
if key == "duration" and value:
first_action_value_param = value
if key == "startpoint" and value:
second_action_value_param = value
if key == "endtime" and value:
first_action_value_param = validate_and_fix_datetime(value)
if key == "starttime" and value:
second_action_value_param = validate_and_fix_datetime(
value
)
logger.debug(
f" -> first_action_value_param (end-time / duration): {first_action_value_param} "
)
logger.debug(
f" -> second_action_value_param (start-time / startpoint): {second_action_value_param} "
)
logger.debug(
f'Build action object for: {imow_action} -> "{imow_action.value}"'
)
# Build other action values depending on given ACTION
if (
imow_action == IMowActions.START_MOWING_FROM_POINT
): # Add the duration and startpoint parameter
duration = (
str(int(first_action_value_param) / 10)
if first_action_value_param
else 30 / 10
)
startpoint = (
str(second_action_value_param)
if second_action_value_param
else "0"
)
action_value = f"{mower_external_id},{duration},{startpoint}"
elif (
imow_action == IMowActions.START_MOWING
): # by start- and/or endtime
endtime = (
str(first_action_value_param)
if first_action_value_param != ""
else None
)
starttime = (
str(second_action_value_param)
if second_action_value_param != ""
else None
)
# Create some defaults
if starttime and not endtime:
# Run for 2 hours from start time if only a start time is given
endtime = (
datetime.strptime(starttime, "%Y-%m-%d %H:%M")
+ timedelta(hours=2)
).strftime("%Y-%m-%d %H:%M")
elif not starttime and not endtime:
# Run for 2 hours from now if no time is given
now = datetime.now()
logger.warning(
f"No start- or endtime is given. Creating an action object with endtime 2 hours from now"
f"based from this machines local timezone. datetime.now() gives {now.strftime('%Y-%m-%d %H:%M')}."
)
endtime = (now + timedelta(hours=2)).strftime("%Y-%m-%d %H:%M")
if starttime:
# Make sure endtime is after starttime
if datetime.strptime(
starttime, "%Y-%m-%d %H:%M"
) < datetime.strptime(endtime, "%Y-%m-%d %H:%M"):
action_value = f"{mower_external_id},{endtime},{starttime}"
else:
raise AttributeError(
f"Time when to end: {endtime} is not afer time to start: {starttime}. This has to be until time travel."
)
else:
action_value = f"{mower_external_id},{endtime}"
else:
action_value = mower_external_id
action_object = {
"actionName": imow_action.value,
"actionValue": action_value
# "0000000123456789,15,0" <MowerExternalId,DurationInMunitesDividedBy10,StartPoint>
# "0000000123456789,15,0" <MowerExternalId,EndTime,StartTime>
}
logger.debug(
f"Intent sent as request body to imow api for mower with identifier: '{mower_name}/{mower_id}/{mower_external_id}'"
)
logger.info(f" {action_object}")
payload = json.dumps(action_object)
if not test_mode:
response = await self.api_request(url, "POST", payload=payload)
if response.ok:
logger.debug(
f"Success: Created mower (extId:{mower_external_id}) ActionObject with contents:"
)
logger.debug(f" {action_object}")
logger.debug(f" -> (HTTP Status {response.status})")
else:
logger.error(f"No success with mower-action: {payload}")
return response
else:
logger.warning(
f"TEST_MODE: (NOT) Created mower (extId:{mower_external_id}) ActionObject with contents:"
)
logger.warning(f" {action_object}")
return True
async def update_setting(self, mower_id, setting, new_value) -> MowerState:
mower_state = await self.receive_mower_by_id(mower_id)
payload_fields = {
"id": mower_state.id,
"unitFormat": mower_state.unitFormat,
"name": mower_state.name,
"teamable": mower_state.teamable,
"accountId": mower_state.accountId,
"childLock": mower_state.childLock,
"corridorMode": mower_state.corridorMode,
"mappingIntelligentHomeDrive": mower_state.mappingIntelligentHomeDrive,
"rainSensorMode": mower_state.rainSensorMode,
"edgeMowingMode": mower_state.edgeMowingMode,
"asmEnabled": mower_state.asmEnabled,
"gpsProtectionEnabled": mower_state.gpsProtectionEnabled,
"automaticModeEnabled": mower_state.automaticModeEnabled,
"localTimezoneOffset": mower_state.localTimezoneOffset,
"mowingTimeManual": None,
"mowingTime": None,
"team": mower_state.team,
"timeZone": mower_state.timeZone,
}
if payload_fields[setting] != new_value:
payload_fields[setting] = new_value
headers = {
"User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de,en-US;q=0.7,en;q=0.3",
"Content-Type": "application/json",
"Origin": "https://app.imow.stihl.com",
"Connection": "keep-alive",
"Referer": "https://app.imow.stihl.com/",
"TE": "Trailers",
}
response = await self.api_request(
url=f"{IMOW_API_URI}/mowers/{mower_state.id}/",
method="PUT",
payload=json.dumps(payload_fields, indent=2).encode("utf-8"),
headers=headers,
)
mower_state.replace_state(json.loads(await response.text()))
return mower_state
else:
logger.info(f"{setting} is already {new_value}.")
return await self.receive_mower_by_id(mower_id)
async def get_status_by_name(self, mower_name: str) -> dict:
logger.debug(f"get_status_by_name: {mower_name}")
for mower in await self.receive_mowers():
if mower.name == mower_name:
return mower.status
raise LookupError(
f"Mower with name {mower_name} not found in upstream"
)
async def get_status_by_id(self, mower_id=(str, int)) -> dict:
if not type(mower_id) == str:
mower_id = str(mower_id)
logger.debug(f"get_status_by_id: {mower_id}")
try:
response = await self.receive_mower_by_id(mower_id)
return response.status
except ConnectionError:
raise LookupError(
f"Mower with id {mower_id} not found in upstream"
)
async def get_status_by_action_id(self, mower_action_id: str) -> dict:
logger.debug(f"get_status_by_action_id: {mower_action_id}")
for mower in await self.receive_mowers():
if mower.externalId == mower_action_id:
return mower.status
raise LookupError(
f"Mower with externalId {mower_action_id} not found in upstream"
)
async def get_mower_action_id_from_name(self, mower_name: str) -> str:
logger.debug(f"get_mower_action_id_from_name: {mower_name}")
for mower in await self.receive_mowers():
if mower.name == mower_name:
return mower.externalId
raise LookupError(
f"Mower with name {mower_name} not found in upstream"
)
async def get_mower_action_id_from_id(self, mower_id: str) -> str:
logger.debug(f"get_mower_action_id_from_id: {mower_id}")
try:
response = await self.receive_mower_by_id(mower_id)
logger.debug(f" - {response.externalId}")
return response.externalId
except ConnectionError:
raise LookupError(
f"Mower with id {mower_id} not found in upstream"
)
async def get_mower_id_from_name(self, mower_name: str) -> str:
logger.debug(f"get_mower_id_from_name: {mower_name}")
for mower in await self.receive_mowers():
if mower.name == mower_name:
return mower.id
raise LookupError(
f"Mower with name {mower_name} not found in upstream"
)
async def receive_mowers(self) -> List[MowerState]:
logger.debug("receive_mowers: ")
mowers = []
response = await self.api_request(f"{IMOW_API_URI}/mowers/", "GET")
for mower in json.loads(await response.text()):
mowers.append(MowerState(mower, self))
for mower in mowers:
logger.debug(f" - {mower.name}")
return mowers
async def receive_mower_by_name(self, mower_name: str) -> MowerState:
logger.debug(f"get_mower_from_name: {mower_name}")
for mower in await self.receive_mowers():
if mower.name == mower_name:
logger.debug(mower)
return mower
raise LookupError(
f"Mower with name {mower_name} not found in upstream"
)
async def receive_mower_by_id(self, mower_id: str) -> MowerState:
logger.debug(f"receive_mower: {mower_id}")
response = await self.api_request(
f"{IMOW_API_URI}/mowers/{mower_id}/", "GET"
)
mower = MowerState(json.loads(await response.text()), self)
logger.debug(mower)
return mower
async def receive_mower_statistics(self, mower_id: str) -> dict:
logger.debug(f"receive_mower_statistics: {mower_id}")
response = await self.api_request(
f"{IMOW_API_URI}/mowers/{mower_id}/statistic/", "GET"
)
stats = json.loads(await response.text())
logger.debug(stats)
return stats
async def receive_mower_week_mow_time_in_hours(
self, mower_id: str
) -> dict:
logger.debug(f"receive_mower_week_mow_time_in_hours: {mower_id}")
response = await self.api_request(
f"{IMOW_API_URI}/mowers/{mower_id}/statistics/week-mow-time-in-hours/",
"GET",
)
mow_times = json.loads(await response.text())
logger.debug(mow_times)
return mow_times
async def receive_mower_start_points(self, mower_id: str) -> dict:
logger.debug(f"receive_mower_start_points: {mower_id}")
response = await self.api_request(
f"{IMOW_API_URI}/mowers/{mower_id}/start-points/", "GET"
)
start_points = json.loads(await response.text())
for startpoint in start_points:
logger.debug(f" - {startpoint}")
return start_points
Functions
def validate_and_fix_datetime(value) ‑> str
-
Try to convert and validate the given string from "%Y-%m-%d %H:%M" or "%Y-%m-%d %H:%M:%S into a datetime object and give the needed "%Y-%m-%d %H:%M" string back. :param value: the string tobe checked :return: the correctly formated string
Expand source code
def validate_and_fix_datetime(value) -> str: """ Try to convert and validate the given string from "%Y-%m-%d %H:%M" or "%Y-%m-%d %H:%M:%S into a datetime object and give the needed "%Y-%m-%d %H:%M" string back. :param value: the string tobe checked :return: the correctly formated string """ try: datetime_object = datetime.strptime(value, "%Y-%m-%d %H:%M") return datetime_object.strftime("%Y-%m-%d %H:%M") except ValueError as ve: logger.warning( f" Try fixing given time format because {ve} in {value}" ) try: datetime_object = datetime.strptime(value, "%Y-%m-%d %H:%M:%S") return datetime_object.strftime("%Y-%m-%d %H:%M") except ValueError as ve2: raise ValueError(f'Unsupported "time" argument: {value} -> {ve2}')
Classes
class IMowApi (email: str = None, password: str = None, token: str = None, aiohttp_session: ClientSession = None, lang: str = 'en')
-
Expand source code
class IMowApi: def __init__( self, email: str = None, password: str = None, token: str = None, aiohttp_session: ClientSession = None, lang: str = "en", ) -> None: self.http_session: ClientSession = aiohttp_session self.csrf_token: str = "" self.requestId: str = "" self.access_token: str = token self.token_expires: datetime = None self.api_email: str = email self.api_password: str = password self.lang = lang self.messages_user = None self.messages_en = None async def close(self): """Cleanup the aiohttp Session""" await asyncio.sleep(0.250) await self.http_session.close() async def check_api_maintenance(self) -> None: url = "https://app-api-maintenance-r-euwe-4bf2d8.azurewebsites.net/maintenance/" headers = { "Authorization": "", } response = await self.api_request(url, "GET", headers=headers) status = json.loads(await response.text()) logger.debug(status) if status["serverDisrupted"] or status["serverDown"]: msg = ( f"iMow API is under Maintenance -> " f'serverDisrupted: {status["serverDisrupted"]}, serverDown: {status["serverDown"]}, ' f'affectedTill {status["affectedTill"]}' ) raise ApiMaintenanceError(msg) async def get_token( self, email: str = "", password: str = "", force_reauth=False, return_expire_time=False, ) -> Union[Tuple[str, datetime], str]: """ look for a token, if present, return. Else authenticate and store new token :param return_expire_time: :param email: stihl webapp login email non-url-encoded :param password: stihl webapp login password :param force_reauth: Force a re-authentication with username and password :return: tuple, the access token and a datetime object containing the expire date """ if not self.access_token or force_reauth: if email and password: self.api_password = password self.api_email = email if force_reauth: await self.api_logout() self.csrf_token = "" self.requestId = "" self.access_token: str = "" self.token_expires: datetime = None if not self.api_email and not self.api_password: raise LoginError( "Got no credentials to authenticate, please provide" ) await self.__authenticate(self.api_email, self.api_password) logger.debug("Get Token: Re-Authenticate") await self.validate_token() if return_expire_time: return self.access_token, self.token_expires else: return self.access_token async def api_logout(self): if not self.http_session or self.http_session.closed: self.http_session = aiohttp.ClientSession(raise_for_status=True) async with self.http_session.post( "https://oauth2.imow.stihl.com/authentication/logout/", data={ "csrf-token": self.csrf_token, "logoutUrl": "https://app.imow.stihl.com", "clientId": "9526273B-1477-47C6-801C-4356F58EF883", "cancelUrl": "https://app.imow.stihl.com", }, ) as resp: await resp.read() self.http_session.cookie_jar.clear_domain("https://app.imow.stihl.com") self.http_session.cookie_jar.clear_domain( "https://oauth2.imow.stihl.com/" ) async def validate_token(self, explicit_token: str = None) -> bool: old_token = None if explicit_token: # save old instance token and place temp token for validation old_token = self.access_token self.access_token = explicit_token await self.receive_mowers() if explicit_token: # Reset instance token self.access_token = old_token return True async def __authenticate( self, email: str, password: str ) -> tuple[Any, datetime, ClientResponse]: """ try the authentication request with fetched csrf and requestId payload :param email: stihl webapp login email non-url-encoded :param password: stihl webapp login password :return: the newly created access token, and expire time besides the legacy response """ await self.__fetch_new_csrf_token_and_request_id() url = f"{IMOW_OAUTH_URI}/authentication/authenticate/?lang={self.lang}" encoded_mail = quote(email) encoded_password = quote(password) payload = ( f"mail={encoded_mail}&password={encoded_password}" f"&csrf-token={self.csrf_token}&requestId={self.requestId} " ) headers = { "Content-Type": "application/x-www-form-urlencoded", } response = await self.api_request( url, "POST", payload=payload, headers=headers ) response_url_query_args = furl(response.real_url).fragment.args if "access_token" not in response_url_query_args: raise LoginError( "STIHL iMow did not return an access_token, check your credentials" ) self.access_token = response_url_query_args["access_token"] self.token_expires = datetime.now() + timedelta( seconds=int(response_url_query_args["expires_in"]) ) return self.access_token, self.token_expires, response async def __fetch_new_csrf_token_and_request_id( self, ) -> tuple[str | list[str] | None, str | list[str] | None]: """ Fetch a new csrf_token and requestId to do the authentication as expected by the api csrf_token and requestId are used as payload within authentication """ # URL needs whole redirect query parameter url = ( f"{IMOW_OAUTH_URI}/authentication/?lang=de_DE&authorizationRedirectUrl=https%3A%2F%2Foauth2" ".imow.stihl.com%2Fauthorization%2F%3Fresponse_type%3Dtoken%26client_id%3D9526273B-1477-47C6-801C" "-4356F58EF883%26redirect_uri%3Dhttps%253A%252F%252Fapp.imow.stihl.com%252F%2523%252Fauthorize%26state" ) response = await self.api_request(url, "GET") soup = BeautifulSoup(await response.text(), "html.parser") try: upstream_csrf_token = soup.find( "input", {"name": "csrf-token"} ).get("value") upstream_request_id = soup.find( "input", {"name": "requestId"} ).get("value") except AttributeError: raise ProcessLookupError( "Did not find necessary csrf token and/or request id in html source" ) self.csrf_token = upstream_csrf_token self.requestId = upstream_request_id logger.debug("CSRF: new token and request id <redacted>") return self.csrf_token, self.requestId async def fetch_messages(self): try: url_en = ( "https://app.imow.stihl.com/assets/i18n/animations/en.json" ) async with self.http_session.request("GET", url_en) as response_en: i18n_en = json.loads(await response_en.text()) self.messages_en = Messages(i18n_en) if self.lang != "en": url_user = f"https://app.imow.stihl.com/assets/i18n/animations/{self.lang}.json" async with self.http_session.request( "GET", url_user ) as response_user: i18n_user = json.loads(await response_user.text()) self.messages_user = Messages(i18n_user) else: self.messages_user = self.messages_en except ClientResponseError as e: if e.status == 404: raise LanguageNotFoundError( f"Language-File '{self.lang}.json' not found on imow upstream (" f"https://app.imow.stihl.com/assets/i18n/animations/{self.lang}.json)" ) async def api_request( self, url, method, payload=None, headers=None ) -> aiohttp.ClientResponse: """ Do a standardized request against the stihl imow webapi, with predefined headers :param url: The target URL :param method: The Method to use :param payload: optional payload :param headers: optional update headers :return: the aiohttp.ClientResponse """ if not self.http_session or self.http_session.closed: self.http_session = aiohttp.ClientSession(raise_for_status=True) if not self.messages_en: await self.fetch_messages() if ( self.token_expires and (self.token_expires - datetime.now()).days <= 1 ): logger.info( "Fetching new access_token because old one expires in less than 1 day" ) await self.get_token(force_reauth=True) if not payload: payload = {} headers_obj = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:88.0) Gecko/20100101 Firefox/88.0", "Accept": "application/json, text/plain, */*", "Accept-Language": "de,en-US;q=0.7,en;q=0.3", "Authorization": f'Bearer {self.access_token if self.access_token else ""}', "Origin": "https://app.imow.stihl.com", "DNT": "1", "Connection": "keep-alive", "Referer": "https://app.imow.stihl.com/", "Pragma": "no-cache", "Cache-Control": "no-cache", "TE": "Trailers", "Content-Type": "application/json", } if headers: headers_obj.update(headers) try: async with self.http_session.request( method, url, headers=headers_obj, data=payload ) as response: await response.read() response.raise_for_status() return response except ClientResponseError as e: if e.status == 500: await self.check_api_maintenance() raise e async def intent( self, imow_action: IMowActions, mower_name: str = "", mower_id: str = "", mower_external_id: str = "", first_action_value_param: any = "", second_action_value_param: any = "", test_mode: bool = False, **kwargs, ) -> aiohttp.ClientResponse: """ Intent to do a action. This seems to create a job object upstream. The action object contains an action Enum, the action Value is <MowerExternalId> or <MowerExternalId,DurationInMunitesDividedBy10,StartPoint> if startMowing is chosen :param imow_action: Anything from imow.common.actions :param mower_name: sth to identify which mower is used :param mower_id: sth to identify which mower is used :param mower_external_id: necessary identifier for the mowers for actions. This is looked up, if only mower_name or mower_id is provided :param first_action_value_param: first argument passed into the action call request to the api. Can be one of the following contents: A duration: minutes of intented mowing. Used by START_MOWING_FROM_POINT. Defaults to '30' minutes. A starttime: a datetime when to start mowing. I.e. '2023-08-12 20:50' used by START_MOWING :param second_action_value_param: second argument passed into the action call request to the api. Can be one of the following contents: A startpoint: from which the mowing shall start. Used by START_MOWING_FROM_POINT. Defaults to '0'. An endtime: a datetime when to stop mowing. I.e. '2023-08-12 20:50' used by START_MOWING :param test_mode: Do not issue the request to the server :return: """ if test_mode: logger.warning("TEST_MODE: Request will not be issued to server.") if not mower_external_id and not mower_id and not mower_name: raise AttributeError( "Need some mower to work on. Please specify mower_[name|id|action_id]" ) if not mower_external_id and mower_name: mower_external_id = await self.get_mower_action_id_from_name( mower_name ) if not mower_external_id and mower_id: mower_external_id = await self.get_mower_action_id_from_id( mower_id ) if len(mower_external_id) < 16: raise AttributeError( f"Invalid mower_action_id, need exactly 16 chars, got {len(mower_external_id)} in {mower_external_id}" ) url = f"{IMOW_API_URI}/mower-actions/" given_kwargs = kwargs.items() if len(given_kwargs) > 0: logger.debug( "Translating given intent **kwargs to action_value_param" ) for key, value in given_kwargs: logger.debug(" {0} = {1}".format(key, value)) if key == "duration" and value: first_action_value_param = value if key == "startpoint" and value: second_action_value_param = value if key == "endtime" and value: first_action_value_param = validate_and_fix_datetime(value) if key == "starttime" and value: second_action_value_param = validate_and_fix_datetime( value ) logger.debug( f" -> first_action_value_param (end-time / duration): {first_action_value_param} " ) logger.debug( f" -> second_action_value_param (start-time / startpoint): {second_action_value_param} " ) logger.debug( f'Build action object for: {imow_action} -> "{imow_action.value}"' ) # Build other action values depending on given ACTION if ( imow_action == IMowActions.START_MOWING_FROM_POINT ): # Add the duration and startpoint parameter duration = ( str(int(first_action_value_param) / 10) if first_action_value_param else 30 / 10 ) startpoint = ( str(second_action_value_param) if second_action_value_param else "0" ) action_value = f"{mower_external_id},{duration},{startpoint}" elif ( imow_action == IMowActions.START_MOWING ): # by start- and/or endtime endtime = ( str(first_action_value_param) if first_action_value_param != "" else None ) starttime = ( str(second_action_value_param) if second_action_value_param != "" else None ) # Create some defaults if starttime and not endtime: # Run for 2 hours from start time if only a start time is given endtime = ( datetime.strptime(starttime, "%Y-%m-%d %H:%M") + timedelta(hours=2) ).strftime("%Y-%m-%d %H:%M") elif not starttime and not endtime: # Run for 2 hours from now if no time is given now = datetime.now() logger.warning( f"No start- or endtime is given. Creating an action object with endtime 2 hours from now" f"based from this machines local timezone. datetime.now() gives {now.strftime('%Y-%m-%d %H:%M')}." ) endtime = (now + timedelta(hours=2)).strftime("%Y-%m-%d %H:%M") if starttime: # Make sure endtime is after starttime if datetime.strptime( starttime, "%Y-%m-%d %H:%M" ) < datetime.strptime(endtime, "%Y-%m-%d %H:%M"): action_value = f"{mower_external_id},{endtime},{starttime}" else: raise AttributeError( f"Time when to end: {endtime} is not afer time to start: {starttime}. This has to be until time travel." ) else: action_value = f"{mower_external_id},{endtime}" else: action_value = mower_external_id action_object = { "actionName": imow_action.value, "actionValue": action_value # "0000000123456789,15,0" <MowerExternalId,DurationInMunitesDividedBy10,StartPoint> # "0000000123456789,15,0" <MowerExternalId,EndTime,StartTime> } logger.debug( f"Intent sent as request body to imow api for mower with identifier: '{mower_name}/{mower_id}/{mower_external_id}'" ) logger.info(f" {action_object}") payload = json.dumps(action_object) if not test_mode: response = await self.api_request(url, "POST", payload=payload) if response.ok: logger.debug( f"Success: Created mower (extId:{mower_external_id}) ActionObject with contents:" ) logger.debug(f" {action_object}") logger.debug(f" -> (HTTP Status {response.status})") else: logger.error(f"No success with mower-action: {payload}") return response else: logger.warning( f"TEST_MODE: (NOT) Created mower (extId:{mower_external_id}) ActionObject with contents:" ) logger.warning(f" {action_object}") return True async def update_setting(self, mower_id, setting, new_value) -> MowerState: mower_state = await self.receive_mower_by_id(mower_id) payload_fields = { "id": mower_state.id, "unitFormat": mower_state.unitFormat, "name": mower_state.name, "teamable": mower_state.teamable, "accountId": mower_state.accountId, "childLock": mower_state.childLock, "corridorMode": mower_state.corridorMode, "mappingIntelligentHomeDrive": mower_state.mappingIntelligentHomeDrive, "rainSensorMode": mower_state.rainSensorMode, "edgeMowingMode": mower_state.edgeMowingMode, "asmEnabled": mower_state.asmEnabled, "gpsProtectionEnabled": mower_state.gpsProtectionEnabled, "automaticModeEnabled": mower_state.automaticModeEnabled, "localTimezoneOffset": mower_state.localTimezoneOffset, "mowingTimeManual": None, "mowingTime": None, "team": mower_state.team, "timeZone": mower_state.timeZone, } if payload_fields[setting] != new_value: payload_fields[setting] = new_value headers = { "User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0", "Accept": "application/json, text/plain, */*", "Accept-Language": "de,en-US;q=0.7,en;q=0.3", "Content-Type": "application/json", "Origin": "https://app.imow.stihl.com", "Connection": "keep-alive", "Referer": "https://app.imow.stihl.com/", "TE": "Trailers", } response = await self.api_request( url=f"{IMOW_API_URI}/mowers/{mower_state.id}/", method="PUT", payload=json.dumps(payload_fields, indent=2).encode("utf-8"), headers=headers, ) mower_state.replace_state(json.loads(await response.text())) return mower_state else: logger.info(f"{setting} is already {new_value}.") return await self.receive_mower_by_id(mower_id) async def get_status_by_name(self, mower_name: str) -> dict: logger.debug(f"get_status_by_name: {mower_name}") for mower in await self.receive_mowers(): if mower.name == mower_name: return mower.status raise LookupError( f"Mower with name {mower_name} not found in upstream" ) async def get_status_by_id(self, mower_id=(str, int)) -> dict: if not type(mower_id) == str: mower_id = str(mower_id) logger.debug(f"get_status_by_id: {mower_id}") try: response = await self.receive_mower_by_id(mower_id) return response.status except ConnectionError: raise LookupError( f"Mower with id {mower_id} not found in upstream" ) async def get_status_by_action_id(self, mower_action_id: str) -> dict: logger.debug(f"get_status_by_action_id: {mower_action_id}") for mower in await self.receive_mowers(): if mower.externalId == mower_action_id: return mower.status raise LookupError( f"Mower with externalId {mower_action_id} not found in upstream" ) async def get_mower_action_id_from_name(self, mower_name: str) -> str: logger.debug(f"get_mower_action_id_from_name: {mower_name}") for mower in await self.receive_mowers(): if mower.name == mower_name: return mower.externalId raise LookupError( f"Mower with name {mower_name} not found in upstream" ) async def get_mower_action_id_from_id(self, mower_id: str) -> str: logger.debug(f"get_mower_action_id_from_id: {mower_id}") try: response = await self.receive_mower_by_id(mower_id) logger.debug(f" - {response.externalId}") return response.externalId except ConnectionError: raise LookupError( f"Mower with id {mower_id} not found in upstream" ) async def get_mower_id_from_name(self, mower_name: str) -> str: logger.debug(f"get_mower_id_from_name: {mower_name}") for mower in await self.receive_mowers(): if mower.name == mower_name: return mower.id raise LookupError( f"Mower with name {mower_name} not found in upstream" ) async def receive_mowers(self) -> List[MowerState]: logger.debug("receive_mowers: ") mowers = [] response = await self.api_request(f"{IMOW_API_URI}/mowers/", "GET") for mower in json.loads(await response.text()): mowers.append(MowerState(mower, self)) for mower in mowers: logger.debug(f" - {mower.name}") return mowers async def receive_mower_by_name(self, mower_name: str) -> MowerState: logger.debug(f"get_mower_from_name: {mower_name}") for mower in await self.receive_mowers(): if mower.name == mower_name: logger.debug(mower) return mower raise LookupError( f"Mower with name {mower_name} not found in upstream" ) async def receive_mower_by_id(self, mower_id: str) -> MowerState: logger.debug(f"receive_mower: {mower_id}") response = await self.api_request( f"{IMOW_API_URI}/mowers/{mower_id}/", "GET" ) mower = MowerState(json.loads(await response.text()), self) logger.debug(mower) return mower async def receive_mower_statistics(self, mower_id: str) -> dict: logger.debug(f"receive_mower_statistics: {mower_id}") response = await self.api_request( f"{IMOW_API_URI}/mowers/{mower_id}/statistic/", "GET" ) stats = json.loads(await response.text()) logger.debug(stats) return stats async def receive_mower_week_mow_time_in_hours( self, mower_id: str ) -> dict: logger.debug(f"receive_mower_week_mow_time_in_hours: {mower_id}") response = await self.api_request( f"{IMOW_API_URI}/mowers/{mower_id}/statistics/week-mow-time-in-hours/", "GET", ) mow_times = json.loads(await response.text()) logger.debug(mow_times) return mow_times async def receive_mower_start_points(self, mower_id: str) -> dict: logger.debug(f"receive_mower_start_points: {mower_id}") response = await self.api_request( f"{IMOW_API_URI}/mowers/{mower_id}/start-points/", "GET" ) start_points = json.loads(await response.text()) for startpoint in start_points: logger.debug(f" - {startpoint}") return start_points
Methods
async def api_logout(self)
-
Expand source code
async def api_logout(self): if not self.http_session or self.http_session.closed: self.http_session = aiohttp.ClientSession(raise_for_status=True) async with self.http_session.post( "https://oauth2.imow.stihl.com/authentication/logout/", data={ "csrf-token": self.csrf_token, "logoutUrl": "https://app.imow.stihl.com", "clientId": "9526273B-1477-47C6-801C-4356F58EF883", "cancelUrl": "https://app.imow.stihl.com", }, ) as resp: await resp.read() self.http_session.cookie_jar.clear_domain("https://app.imow.stihl.com") self.http_session.cookie_jar.clear_domain( "https://oauth2.imow.stihl.com/" )
async def api_request(self, url, method, payload=None, headers=None) ‑> aiohttp.client_reqrep.ClientResponse
-
Do a standardized request against the stihl imow webapi, with predefined headers :param url: The target URL :param method: The Method to use :param payload: optional payload :param headers: optional update headers :return: the aiohttp.ClientResponse
Expand source code
async def api_request( self, url, method, payload=None, headers=None ) -> aiohttp.ClientResponse: """ Do a standardized request against the stihl imow webapi, with predefined headers :param url: The target URL :param method: The Method to use :param payload: optional payload :param headers: optional update headers :return: the aiohttp.ClientResponse """ if not self.http_session or self.http_session.closed: self.http_session = aiohttp.ClientSession(raise_for_status=True) if not self.messages_en: await self.fetch_messages() if ( self.token_expires and (self.token_expires - datetime.now()).days <= 1 ): logger.info( "Fetching new access_token because old one expires in less than 1 day" ) await self.get_token(force_reauth=True) if not payload: payload = {} headers_obj = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:88.0) Gecko/20100101 Firefox/88.0", "Accept": "application/json, text/plain, */*", "Accept-Language": "de,en-US;q=0.7,en;q=0.3", "Authorization": f'Bearer {self.access_token if self.access_token else ""}', "Origin": "https://app.imow.stihl.com", "DNT": "1", "Connection": "keep-alive", "Referer": "https://app.imow.stihl.com/", "Pragma": "no-cache", "Cache-Control": "no-cache", "TE": "Trailers", "Content-Type": "application/json", } if headers: headers_obj.update(headers) try: async with self.http_session.request( method, url, headers=headers_obj, data=payload ) as response: await response.read() response.raise_for_status() return response except ClientResponseError as e: if e.status == 500: await self.check_api_maintenance() raise e
async def check_api_maintenance(self) ‑> None
-
Expand source code
async def check_api_maintenance(self) -> None: url = "https://app-api-maintenance-r-euwe-4bf2d8.azurewebsites.net/maintenance/" headers = { "Authorization": "", } response = await self.api_request(url, "GET", headers=headers) status = json.loads(await response.text()) logger.debug(status) if status["serverDisrupted"] or status["serverDown"]: msg = ( f"iMow API is under Maintenance -> " f'serverDisrupted: {status["serverDisrupted"]}, serverDown: {status["serverDown"]}, ' f'affectedTill {status["affectedTill"]}' ) raise ApiMaintenanceError(msg)
async def close(self)
-
Cleanup the aiohttp Session
Expand source code
async def close(self): """Cleanup the aiohttp Session""" await asyncio.sleep(0.250) await self.http_session.close()
async def fetch_messages(self)
-
Expand source code
async def fetch_messages(self): try: url_en = ( "https://app.imow.stihl.com/assets/i18n/animations/en.json" ) async with self.http_session.request("GET", url_en) as response_en: i18n_en = json.loads(await response_en.text()) self.messages_en = Messages(i18n_en) if self.lang != "en": url_user = f"https://app.imow.stihl.com/assets/i18n/animations/{self.lang}.json" async with self.http_session.request( "GET", url_user ) as response_user: i18n_user = json.loads(await response_user.text()) self.messages_user = Messages(i18n_user) else: self.messages_user = self.messages_en except ClientResponseError as e: if e.status == 404: raise LanguageNotFoundError( f"Language-File '{self.lang}.json' not found on imow upstream (" f"https://app.imow.stihl.com/assets/i18n/animations/{self.lang}.json)" )
async def get_mower_action_id_from_id(self, mower_id: str) ‑> str
-
Expand source code
async def get_mower_action_id_from_id(self, mower_id: str) -> str: logger.debug(f"get_mower_action_id_from_id: {mower_id}") try: response = await self.receive_mower_by_id(mower_id) logger.debug(f" - {response.externalId}") return response.externalId except ConnectionError: raise LookupError( f"Mower with id {mower_id} not found in upstream" )
async def get_mower_action_id_from_name(self, mower_name: str) ‑> str
-
Expand source code
async def get_mower_action_id_from_name(self, mower_name: str) -> str: logger.debug(f"get_mower_action_id_from_name: {mower_name}") for mower in await self.receive_mowers(): if mower.name == mower_name: return mower.externalId raise LookupError( f"Mower with name {mower_name} not found in upstream" )
async def get_mower_id_from_name(self, mower_name: str) ‑> str
-
Expand source code
async def get_mower_id_from_name(self, mower_name: str) -> str: logger.debug(f"get_mower_id_from_name: {mower_name}") for mower in await self.receive_mowers(): if mower.name == mower_name: return mower.id raise LookupError( f"Mower with name {mower_name} not found in upstream" )
async def get_status_by_action_id(self, mower_action_id: str) ‑> dict
-
Expand source code
async def get_status_by_action_id(self, mower_action_id: str) -> dict: logger.debug(f"get_status_by_action_id: {mower_action_id}") for mower in await self.receive_mowers(): if mower.externalId == mower_action_id: return mower.status raise LookupError( f"Mower with externalId {mower_action_id} not found in upstream" )
async def get_status_by_id(self, mower_id=(<class 'str'>, <class 'int'>)) ‑> dict
-
Expand source code
async def get_status_by_id(self, mower_id=(str, int)) -> dict: if not type(mower_id) == str: mower_id = str(mower_id) logger.debug(f"get_status_by_id: {mower_id}") try: response = await self.receive_mower_by_id(mower_id) return response.status except ConnectionError: raise LookupError( f"Mower with id {mower_id} not found in upstream" )
async def get_status_by_name(self, mower_name: str) ‑> dict
-
Expand source code
async def get_status_by_name(self, mower_name: str) -> dict: logger.debug(f"get_status_by_name: {mower_name}") for mower in await self.receive_mowers(): if mower.name == mower_name: return mower.status raise LookupError( f"Mower with name {mower_name} not found in upstream" )
async def get_token(self, email: str = '', password: str = '', force_reauth=False, return_expire_time=False) ‑> Union[Tuple[str, datetime.datetime], str]
-
look for a token, if present, return. Else authenticate and store new token :param return_expire_time: :param email: stihl webapp login email non-url-encoded :param password: stihl webapp login password :param force_reauth: Force a re-authentication with username and password :return: tuple, the access token and a datetime object containing the expire date
Expand source code
async def get_token( self, email: str = "", password: str = "", force_reauth=False, return_expire_time=False, ) -> Union[Tuple[str, datetime], str]: """ look for a token, if present, return. Else authenticate and store new token :param return_expire_time: :param email: stihl webapp login email non-url-encoded :param password: stihl webapp login password :param force_reauth: Force a re-authentication with username and password :return: tuple, the access token and a datetime object containing the expire date """ if not self.access_token or force_reauth: if email and password: self.api_password = password self.api_email = email if force_reauth: await self.api_logout() self.csrf_token = "" self.requestId = "" self.access_token: str = "" self.token_expires: datetime = None if not self.api_email and not self.api_password: raise LoginError( "Got no credentials to authenticate, please provide" ) await self.__authenticate(self.api_email, self.api_password) logger.debug("Get Token: Re-Authenticate") await self.validate_token() if return_expire_time: return self.access_token, self.token_expires else: return self.access_token
async def intent(self, imow_action: IMowActions, mower_name: str = '', mower_id: str = '', mower_external_id: str = '', first_action_value_param: any = '', second_action_value_param: any = '', test_mode: bool = False, **kwargs) ‑> aiohttp.client_reqrep.ClientResponse
-
Intent to do a action. This seems to create a job object upstream. The action object contains an action Enum, the action Value is
or if startMowing is chosen :param imow_action: Anything from imow.common.actions :param mower_name: sth to identify which mower is used :param mower_id: sth to identify which mower is used :param mower_external_id: necessary identifier for the mowers for actions. This is looked up, if only mower_name or mower_id is provided
:param first_action_value_param: first argument passed into the action call request to the api. Can be one of the following contents: A duration: minutes of intented mowing. Used by START_MOWING_FROM_POINT. Defaults to '30' minutes. A starttime: a datetime when to start mowing. I.e. '2023-08-12 20:50' used by START_MOWING
:param second_action_value_param: second argument passed into the action call request to the api. Can be one of the following contents: A startpoint: from which the mowing shall start. Used by START_MOWING_FROM_POINT. Defaults to '0'. An endtime: a datetime when to stop mowing. I.e. '2023-08-12 20:50' used by START_MOWING :param test_mode: Do not issue the request to the server :return:
Expand source code
async def intent( self, imow_action: IMowActions, mower_name: str = "", mower_id: str = "", mower_external_id: str = "", first_action_value_param: any = "", second_action_value_param: any = "", test_mode: bool = False, **kwargs, ) -> aiohttp.ClientResponse: """ Intent to do a action. This seems to create a job object upstream. The action object contains an action Enum, the action Value is <MowerExternalId> or <MowerExternalId,DurationInMunitesDividedBy10,StartPoint> if startMowing is chosen :param imow_action: Anything from imow.common.actions :param mower_name: sth to identify which mower is used :param mower_id: sth to identify which mower is used :param mower_external_id: necessary identifier for the mowers for actions. This is looked up, if only mower_name or mower_id is provided :param first_action_value_param: first argument passed into the action call request to the api. Can be one of the following contents: A duration: minutes of intented mowing. Used by START_MOWING_FROM_POINT. Defaults to '30' minutes. A starttime: a datetime when to start mowing. I.e. '2023-08-12 20:50' used by START_MOWING :param second_action_value_param: second argument passed into the action call request to the api. Can be one of the following contents: A startpoint: from which the mowing shall start. Used by START_MOWING_FROM_POINT. Defaults to '0'. An endtime: a datetime when to stop mowing. I.e. '2023-08-12 20:50' used by START_MOWING :param test_mode: Do not issue the request to the server :return: """ if test_mode: logger.warning("TEST_MODE: Request will not be issued to server.") if not mower_external_id and not mower_id and not mower_name: raise AttributeError( "Need some mower to work on. Please specify mower_[name|id|action_id]" ) if not mower_external_id and mower_name: mower_external_id = await self.get_mower_action_id_from_name( mower_name ) if not mower_external_id and mower_id: mower_external_id = await self.get_mower_action_id_from_id( mower_id ) if len(mower_external_id) < 16: raise AttributeError( f"Invalid mower_action_id, need exactly 16 chars, got {len(mower_external_id)} in {mower_external_id}" ) url = f"{IMOW_API_URI}/mower-actions/" given_kwargs = kwargs.items() if len(given_kwargs) > 0: logger.debug( "Translating given intent **kwargs to action_value_param" ) for key, value in given_kwargs: logger.debug(" {0} = {1}".format(key, value)) if key == "duration" and value: first_action_value_param = value if key == "startpoint" and value: second_action_value_param = value if key == "endtime" and value: first_action_value_param = validate_and_fix_datetime(value) if key == "starttime" and value: second_action_value_param = validate_and_fix_datetime( value ) logger.debug( f" -> first_action_value_param (end-time / duration): {first_action_value_param} " ) logger.debug( f" -> second_action_value_param (start-time / startpoint): {second_action_value_param} " ) logger.debug( f'Build action object for: {imow_action} -> "{imow_action.value}"' ) # Build other action values depending on given ACTION if ( imow_action == IMowActions.START_MOWING_FROM_POINT ): # Add the duration and startpoint parameter duration = ( str(int(first_action_value_param) / 10) if first_action_value_param else 30 / 10 ) startpoint = ( str(second_action_value_param) if second_action_value_param else "0" ) action_value = f"{mower_external_id},{duration},{startpoint}" elif ( imow_action == IMowActions.START_MOWING ): # by start- and/or endtime endtime = ( str(first_action_value_param) if first_action_value_param != "" else None ) starttime = ( str(second_action_value_param) if second_action_value_param != "" else None ) # Create some defaults if starttime and not endtime: # Run for 2 hours from start time if only a start time is given endtime = ( datetime.strptime(starttime, "%Y-%m-%d %H:%M") + timedelta(hours=2) ).strftime("%Y-%m-%d %H:%M") elif not starttime and not endtime: # Run for 2 hours from now if no time is given now = datetime.now() logger.warning( f"No start- or endtime is given. Creating an action object with endtime 2 hours from now" f"based from this machines local timezone. datetime.now() gives {now.strftime('%Y-%m-%d %H:%M')}." ) endtime = (now + timedelta(hours=2)).strftime("%Y-%m-%d %H:%M") if starttime: # Make sure endtime is after starttime if datetime.strptime( starttime, "%Y-%m-%d %H:%M" ) < datetime.strptime(endtime, "%Y-%m-%d %H:%M"): action_value = f"{mower_external_id},{endtime},{starttime}" else: raise AttributeError( f"Time when to end: {endtime} is not afer time to start: {starttime}. This has to be until time travel." ) else: action_value = f"{mower_external_id},{endtime}" else: action_value = mower_external_id action_object = { "actionName": imow_action.value, "actionValue": action_value # "0000000123456789,15,0" <MowerExternalId,DurationInMunitesDividedBy10,StartPoint> # "0000000123456789,15,0" <MowerExternalId,EndTime,StartTime> } logger.debug( f"Intent sent as request body to imow api for mower with identifier: '{mower_name}/{mower_id}/{mower_external_id}'" ) logger.info(f" {action_object}") payload = json.dumps(action_object) if not test_mode: response = await self.api_request(url, "POST", payload=payload) if response.ok: logger.debug( f"Success: Created mower (extId:{mower_external_id}) ActionObject with contents:" ) logger.debug(f" {action_object}") logger.debug(f" -> (HTTP Status {response.status})") else: logger.error(f"No success with mower-action: {payload}") return response else: logger.warning( f"TEST_MODE: (NOT) Created mower (extId:{mower_external_id}) ActionObject with contents:" ) logger.warning(f" {action_object}") return True
async def receive_mower_by_id(self, mower_id: str) ‑> MowerState
-
Expand source code
async def receive_mower_by_id(self, mower_id: str) -> MowerState: logger.debug(f"receive_mower: {mower_id}") response = await self.api_request( f"{IMOW_API_URI}/mowers/{mower_id}/", "GET" ) mower = MowerState(json.loads(await response.text()), self) logger.debug(mower) return mower
async def receive_mower_by_name(self, mower_name: str) ‑> MowerState
-
Expand source code
async def receive_mower_by_name(self, mower_name: str) -> MowerState: logger.debug(f"get_mower_from_name: {mower_name}") for mower in await self.receive_mowers(): if mower.name == mower_name: logger.debug(mower) return mower raise LookupError( f"Mower with name {mower_name} not found in upstream" )
async def receive_mower_start_points(self, mower_id: str) ‑> dict
-
Expand source code
async def receive_mower_start_points(self, mower_id: str) -> dict: logger.debug(f"receive_mower_start_points: {mower_id}") response = await self.api_request( f"{IMOW_API_URI}/mowers/{mower_id}/start-points/", "GET" ) start_points = json.loads(await response.text()) for startpoint in start_points: logger.debug(f" - {startpoint}") return start_points
async def receive_mower_statistics(self, mower_id: str) ‑> dict
-
Expand source code
async def receive_mower_statistics(self, mower_id: str) -> dict: logger.debug(f"receive_mower_statistics: {mower_id}") response = await self.api_request( f"{IMOW_API_URI}/mowers/{mower_id}/statistic/", "GET" ) stats = json.loads(await response.text()) logger.debug(stats) return stats
async def receive_mower_week_mow_time_in_hours(self, mower_id: str) ‑> dict
-
Expand source code
async def receive_mower_week_mow_time_in_hours( self, mower_id: str ) -> dict: logger.debug(f"receive_mower_week_mow_time_in_hours: {mower_id}") response = await self.api_request( f"{IMOW_API_URI}/mowers/{mower_id}/statistics/week-mow-time-in-hours/", "GET", ) mow_times = json.loads(await response.text()) logger.debug(mow_times) return mow_times
async def receive_mowers(self) ‑> List[MowerState]
-
Expand source code
async def receive_mowers(self) -> List[MowerState]: logger.debug("receive_mowers: ") mowers = [] response = await self.api_request(f"{IMOW_API_URI}/mowers/", "GET") for mower in json.loads(await response.text()): mowers.append(MowerState(mower, self)) for mower in mowers: logger.debug(f" - {mower.name}") return mowers
async def update_setting(self, mower_id, setting, new_value) ‑> MowerState
-
Expand source code
async def update_setting(self, mower_id, setting, new_value) -> MowerState: mower_state = await self.receive_mower_by_id(mower_id) payload_fields = { "id": mower_state.id, "unitFormat": mower_state.unitFormat, "name": mower_state.name, "teamable": mower_state.teamable, "accountId": mower_state.accountId, "childLock": mower_state.childLock, "corridorMode": mower_state.corridorMode, "mappingIntelligentHomeDrive": mower_state.mappingIntelligentHomeDrive, "rainSensorMode": mower_state.rainSensorMode, "edgeMowingMode": mower_state.edgeMowingMode, "asmEnabled": mower_state.asmEnabled, "gpsProtectionEnabled": mower_state.gpsProtectionEnabled, "automaticModeEnabled": mower_state.automaticModeEnabled, "localTimezoneOffset": mower_state.localTimezoneOffset, "mowingTimeManual": None, "mowingTime": None, "team": mower_state.team, "timeZone": mower_state.timeZone, } if payload_fields[setting] != new_value: payload_fields[setting] = new_value headers = { "User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0", "Accept": "application/json, text/plain, */*", "Accept-Language": "de,en-US;q=0.7,en;q=0.3", "Content-Type": "application/json", "Origin": "https://app.imow.stihl.com", "Connection": "keep-alive", "Referer": "https://app.imow.stihl.com/", "TE": "Trailers", } response = await self.api_request( url=f"{IMOW_API_URI}/mowers/{mower_state.id}/", method="PUT", payload=json.dumps(payload_fields, indent=2).encode("utf-8"), headers=headers, ) mower_state.replace_state(json.loads(await response.text())) return mower_state else: logger.info(f"{setting} is already {new_value}.") return await self.receive_mower_by_id(mower_id)
async def validate_token(self, explicit_token: str = None) ‑> bool
-
Expand source code
async def validate_token(self, explicit_token: str = None) -> bool: old_token = None if explicit_token: # save old instance token and place temp token for validation old_token = self.access_token self.access_token = explicit_token await self.receive_mowers() if explicit_token: # Reset instance token self.access_token = old_token return True