Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 64 additions & 40 deletions pysuez/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import requests
import re
import datetime
import asyncio
import aiohttp



Expand All @@ -16,7 +17,7 @@ class PySuezError(Exception):
class SuezClient():
"""Global variables."""

def __init__(self, username, password, counter_id, provider, session=None, timeout=None):
def __init__(self, username, password, counter_id, provider=None, timeout=None):
"""Initialize the client object."""
self._username = username
self._password = password
Expand All @@ -27,11 +28,25 @@ def __init__(self, username, password, counter_id, provider, session=None, timeo
self.data = {}
self.attributes = {}
self.success = False
self._session = session
self._session = None
self._timeout = timeout
self.state = 0

def _get_token(self):
def _get_token_1(self, content):
phrase = re.compile('csrf_token(.*)')
result = phrase.search(content)
if result is None:
return None
return result.group(1)

def _get_token_2(self, content):
phrase = re.compile('csrfToken\\\\u0022\\\\u003A\\\\u0022(.*)\\\\u0022,\\\\u0022')
result = phrase.search(content)
if result is None:
return None
return result.group(1).encode().decode('unicode_escape')

async def _get_token(self):
"""Get the token"""
headers = {
'Accept': "application/json, text/javascript, */*; q=0.01",
Expand All @@ -47,25 +62,19 @@ def _get_token(self):
BASE_URI = 'https://www.eau-olivet.fr'
url = BASE_URI+API_ENDPOINT_LOGIN

response = requests.get(url, headers=headers, timeout=self._timeout)

headers['Cookie'] = ""
for key in response.cookies.get_dict():
if headers['Cookie']:
headers['Cookie'] += "; "
headers['Cookie'] += key + "=" + response.cookies[key]
response = await self._session.get(url, headers=headers, timeout=self._timeout)

phrase = re.compile('_csrf_token" value="(.*)"/>')
result = phrase.search(response.content.decode('utf-8'))
self._token = result.group(1)
headers['Cookie'] = "; ".join([f"{key}={value}" for (key, value) in response.cookies.items()])
self._headers = headers
decoded_content = (await response.read()).decode('utf-8')
self._token = self._get_token_1(decoded_content) or self._get_token_2(decoded_content)
if self._token is None:
raise PySuezError("Can't get token.")

def _get_cookie(self):
async def _get_cookie(self):
"""Connect and get the cookie"""
if self._session is None:
self._session = requests.Session()

self._get_token()
await self._get_token()
data = {
'_username': self._username,
'_password': self._password,
Expand All @@ -77,23 +86,23 @@ def _get_cookie(self):
}
url = BASE_URI+API_ENDPOINT_LOGIN
try:
self._session.post(url,
response = await self._session.post(url,
headers=self._headers,
data=data,
allow_redirects=False,
timeout=self._timeout)
except OSError:
raise PySuezError("Can not submit login form.")

if not 'eZSESSID' in self._session.cookies.get_dict():
if not 'eZSESSID' in response.cookies.keys():
raise PySuezError("Login error: Please check your username/password.")

self._headers['Cookie'] = ''
self._headers['Cookie'] = 'eZSESSID='+self._session.cookies.get("eZSESSID")
self._headers['Cookie'] = f"eZSESSID={response.cookies['eZSESSID']}"
return True


def _fetch_data(self):
async def _fetch_data(self):
"""Fetch latest data from Suez."""
now = datetime.datetime.now()
today_year = now.strftime("%Y")
Expand All @@ -108,12 +117,12 @@ def _fetch_data(self):
yesterday_month, self._counter_id
)

self._get_cookie()

data = requests.get(url, headers=self._headers)
await self._get_cookie()

data = await self._session.get(url, headers=self._headers)
json = await data.json()
try:
self.state = int(float(data.json()[int(
self.state = int(float(json[int(
yesterday_day)-1][1])*1000)
self.success = True
self.attributes['attribution'] = "Data provided by toutsurmoneau.fr"
Expand All @@ -129,10 +138,10 @@ def _fetch_data(self):
today_year,
today_month, self._counter_id
)
data = requests.get(url, headers=self._headers)
data = await self._session.get(url, headers=self._headers)

self.attributes['thisMonthConsumption'] = {}
for item in data.json():
for item in json:
self.attributes['thisMonthConsumption'][item[0]] = int(
float(item[1])*1000)

Expand All @@ -154,10 +163,11 @@ def _fetch_data(self):
self._counter_id
)

data = requests.get(url, headers=self._headers)
data = await self._session.get(url, headers=self._headers)
json = await data.json()

self.attributes['previousMonthConsumption'] = {}
for item in data.json():
for item in json:
self.attributes['previousMonthConsumption'][item[0]] = int(
float(item[1])*1000)

Expand All @@ -169,8 +179,8 @@ def _fetch_data(self):
url = BASE_URI+API_ENDPOINT_HISTORY
url += '{}'.format(self._counter_id)

data = requests.get(url, headers=self._headers)
fetched_data = data.json()
data = await self._session.get(url, headers=self._headers)
fetched_data = await data.json()
self.attributes['highestMonthlyConsumption'] = int(
float(fetched_data[-1])*1000)
fetched_data.pop()
Expand All @@ -190,11 +200,8 @@ def _fetch_data(self):
raise PySuezError("Issue with history data")
pass

def check_credentials(self):
if self._session is None:
self._session = requests.Session()

self._get_token()
async def _check_credentials(self):
await self._get_token()
data = {
'_username': self._username,
'_password': self._password,
Expand All @@ -207,7 +214,7 @@ def check_credentials(self):
url = BASE_URI+API_ENDPOINT_LOGIN

try:
self._session.post(url,
await self._session.post(url,
headers=self._headers,
data=data,
allow_redirects=False,
Expand All @@ -234,9 +241,9 @@ def check_credentials(self):
#else:
# return False

def update(self):
async def _update(self):
"""Return the latest collected data from Linky."""
self._fetch_data()
await self._fetch_data()
if not self.success:
return
return self.attributes
Expand All @@ -246,4 +253,21 @@ def close_session(self):
self._session.close()
self._session = None

async def update_async(self):
"""Asynchronous update"""
async with aiohttp.ClientSession() as self._session:
return await self._update()

async def check_credentials_async(self):
"""Asynchronous check_credential"""
async with aiohttp.ClientSession() as self._session:
return await self._check_credentials()

def update(self):
"""Synchronous update"""
return asyncio.run(self.update_async())

def check_credentials(self):
"""Asynchronous check_credential"""
return asyncio.run(self.check_credentials())