Skip to content

Commit 8df1bb6

Browse files
author
lbbrhzn
authored
improve reconnect and trigger boot notification (#341)
* improve reconnect and trigger boot notification * cancel tasks on close and reconnect * improve exception handling Co-authored-by: lbbrhzn <@lbbrhzn>
1 parent c9cd7b3 commit 8df1bb6

File tree

2 files changed

+76
-58
lines changed

2 files changed

+76
-58
lines changed

.devcontainer/configuration.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ default_config:
33
logger:
44
default: info
55
logs:
6-
custom_components.occp: debug
6+
custom_components.ocpp: debug
77

88
# If you need to debug uncomment the line below (doc: https://www.home-assistant.io/integrations/debugpy/)
99
# debugpy:

custom_components/ocpp/api.py

Lines changed: 75 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,9 @@ async def create(hass: HomeAssistant, entry: ConfigEntry):
153153
self._server = server
154154
return self
155155

156-
async def on_connect(self, websocket: websockets.connection, path: str):
156+
async def on_connect(
157+
self, websocket: websockets.server.WebSocketServerProtocol, path: str
158+
):
157159
"""Request handler executed for every new OCPP connection."""
158160

159161
try:
@@ -180,20 +182,26 @@ async def on_connect(self, websocket: websockets.connection, path: str):
180182
cp_id = cp_id[cp_id.rfind("/") + 1 :]
181183
try:
182184
if self.cpid not in self.charge_points:
183-
_LOGGER.info(f"Charger {cp_id} connected to {self.host}:{self.port}.")
185+
_LOGGER.info(
186+
f"Charger {cp_id} connected to {self.host}:{self.port} websocket={websocket.id}."
187+
)
184188
charge_point = ChargePoint(
185189
cp_id, websocket, self.hass, self.entry, self
186190
)
187191
self.charge_points[self.cpid] = charge_point
188192
await charge_point.start()
189193
else:
190-
_LOGGER.info(f"Charger {cp_id} reconnected to {self.host}:{self.port}.")
194+
_LOGGER.info(
195+
f"Charger {cp_id} reconnected to {self.host}:{self.port} websocket={websocket.id}."
196+
)
191197
charge_point: ChargePoint = self.charge_points[self.cpid]
192198
await charge_point.reconnect(websocket)
193199
except Exception as e:
194200
_LOGGER.error(f"Exception occurred:\n{e}", exc_info=True)
195201
finally:
196-
_LOGGER.info(f"Charger {cp_id} disconnected from {self.host}:{self.port}.")
202+
_LOGGER.info(
203+
f"Charger {cp_id} disconnected from {self.host}:{self.port} websocket={websocket.id}."
204+
)
197205

198206
def get_metric(self, cp_id: str, measurand: str):
199207
"""Return last known value for given measurand."""
@@ -282,7 +290,7 @@ class ChargePoint(cp):
282290
def __init__(
283291
self,
284292
id: str,
285-
connection: websockets.connection,
293+
connection: websockets.server.WebSocketServerProtocol,
286294
hass: HomeAssistant,
287295
entry: ConfigEntry,
288296
central: CentralSystem,
@@ -300,6 +308,9 @@ def __init__(
300308
self._requires_reboot = False
301309
self.preparing = asyncio.Event()
302310
self._transactionId = 0
311+
self.triggered_boot_notification = False
312+
self.received_boot_notification = False
313+
self.tasks = None
303314
self._metrics = defaultdict(lambda: Metric(None, None))
304315
self._metrics[cdet.identifier.value].value = id
305316
self._metrics[csess.session_time.value].unit = TIME_MINUTES
@@ -367,7 +378,8 @@ async def handle_data_transfer(call):
367378
await asyncio.sleep(2)
368379
await self.get_supported_features()
369380
if prof.REM in self._attr_supported_features:
370-
await self.trigger_boot_notification()
381+
if self.received_boot_notification is False:
382+
await self.trigger_boot_notification()
371383
await self.trigger_status_notification()
372384
await self.become_operative()
373385
await self.get_configuration(ckey.heartbeat_interval.value)
@@ -457,8 +469,10 @@ async def trigger_boot_notification(self):
457469
)
458470
resp = await self.call(req)
459471
if resp.status == TriggerMessageStatus.accepted:
472+
self.triggered_boot_notification = True
460473
return True
461474
else:
475+
self.triggered_boot_notification = False
462476
_LOGGER.warning("Failed with response: %s", resp.status)
463477
return False
464478

@@ -784,40 +798,29 @@ async def monitor_connection(self):
784798
self._metrics[cstat.latency_ping.value].unit = "ms"
785799
self._metrics[cstat.latency_pong.value].unit = "ms"
786800

801+
connection = self._connection
787802
try:
788-
while True:
789-
if self._connection.open is False:
790-
_LOGGER.debug(f"Connection not open '{self.id}'")
791-
await asyncio.sleep(timeout)
792-
continue
793-
t0 = time.perf_counter()
794-
pong_waiter = await asyncio.wait_for(
795-
self._connection.ping(), timeout=timeout
796-
)
797-
t1 = time.perf_counter()
803+
while connection.open:
804+
time0 = time.perf_counter()
805+
latency_ping = timeout * 1000
806+
pong_waiter = await asyncio.wait_for(connection.ping(), timeout=timeout)
807+
time1 = time.perf_counter()
808+
latency_ping = round(time1 - time0, 3)
809+
latency_pong = timeout * 1000
798810
await asyncio.wait_for(pong_waiter, timeout=timeout)
799-
t2 = time.perf_counter()
800-
latency_ping = round(1000 * (t1 - t0))
801-
latency_pong = round(1000 * (t2 - t1))
811+
time2 = time.perf_counter()
812+
latency_pong = round(time2 - time1, 3)
802813
_LOGGER.debug(
803814
f"Connection latency from '{self.central.csid}' to '{self.id}': ping={latency_ping} ms, pong={latency_pong} ms",
804815
)
805816
self._metrics[cstat.latency_ping.value].value = latency_ping
806817
self._metrics[cstat.latency_pong.value].value = latency_pong
807818
await asyncio.sleep(timeout)
808819

809-
except asyncio.TimeoutError:
810-
_LOGGER.debug(f"Timeout in connection '{self.id}'")
811-
self._connection.close()
812-
except websockets.exceptions.ConnectionClosed as connection_closed_exception:
813-
_LOGGER.debug(
814-
f"Connection closed to '{self.id}': {connection_closed_exception}"
815-
)
816-
except Exception as other_exception:
817-
_LOGGER.error(
818-
f"Unexpected exception in connection to '{self.id}': {other_exception}",
819-
exc_info=True,
820-
)
820+
except asyncio.TimeoutError as timeout_exception:
821+
self._metrics[cstat.latency_ping.value].value = latency_ping
822+
self._metrics[cstat.latency_pong.value].value = latency_pong
823+
raise timeout_exception
821824

822825
async def _handle_call(self, msg):
823826
try:
@@ -828,32 +831,45 @@ async def _handle_call(self, msg):
828831

829832
async def start(self):
830833
"""Start charge point."""
834+
await self.run(
835+
[super().start(), self.post_connect(), self.monitor_connection()]
836+
)
837+
838+
async def run(self, tasks):
839+
"""Run a specified list of tasks."""
840+
self.tasks = [asyncio.ensure_future(task) for task in tasks]
831841
try:
832-
await asyncio.gather(
833-
super().start(), self.monitor_connection(), self.post_connect()
842+
await asyncio.gather(*self.tasks)
843+
except asyncio.TimeoutError:
844+
_LOGGER.debug(f"Timeout in connection '{self.id}'")
845+
except websockets.exceptions.WebSocketException as websocket_exception:
846+
_LOGGER.debug(f"Connection closed to '{self.id}': {websocket_exception}")
847+
except Exception as other_exception:
848+
_LOGGER.error(
849+
f"Unexpected exception in connection to '{self.id}': '{other_exception}'",
850+
exc_info=True,
834851
)
835-
except websockets.exceptions.WebSocketException as e:
836-
_LOGGER.debug("Websockets exception: %s", e)
837852
finally:
853+
await self.stop()
854+
855+
async def stop(self):
856+
"""Close connection and cancel ongoing tasks."""
857+
self.status = STATE_UNAVAILABLE
858+
if self._connection.open:
859+
_LOGGER.debug(f"Closing websocket: '{self._connection.id}'")
838860
await self._connection.close()
839-
self.status = STATE_UNAVAILABLE
861+
for task in self.tasks:
862+
task.cancel()
840863

841-
async def reconnect(self, connection: websockets.connection):
864+
async def reconnect(self, connection: websockets.server.WebSocketServerProtocol):
842865
"""Reconnect charge point."""
843-
# close old connection, if needed
844-
if self._connection is not None:
845-
await self._connection.close()
846-
# use the new connection
866+
_LOGGER.debug(f"Reconnect {connection.id}")
867+
868+
await self.stop()
869+
self.status = STATE_OK
847870
self._connection = connection
848871
self._metrics[cstat.reconnects.value].value += 1
849-
try:
850-
self.status = STATE_OK
851-
await asyncio.gather(super().start(), self.monitor_connection())
852-
except websockets.exceptions.WebSocketException as e:
853-
_LOGGER.debug("Websockets exception: %s", e)
854-
finally:
855-
await self._connection.close()
856-
self.status = STATE_UNAVAILABLE
872+
await self.run([super().start(), self.monitor_connection()])
857873

858874
async def async_update_device_info(self, boot_info: dict):
859875
"""Update device info asynchronuously."""
@@ -887,13 +903,13 @@ def average_of_nonzero(values):
887903
return average
888904

889905
measurand_data = {}
890-
for sv in data:
906+
for item in data:
891907
# create ordered Dict for each measurand, eg {"voltage":{"unit":"V","L1":"230"...}}
892-
measurand = sv.get(om.measurand.value, None)
893-
phase = sv.get(om.phase.value, None)
894-
value = sv.get(om.value.value, None)
895-
unit = sv.get(om.unit.value, None)
896-
context = sv.get(om.context.value, None)
908+
measurand = item.get(om.measurand.value, None)
909+
phase = item.get(om.phase.value, None)
910+
value = item.get(om.value.value, None)
911+
unit = item.get(om.unit.value, None)
912+
context = item.get(om.context.value, None)
897913
if measurand is not None and phase is not None:
898914
if measurand not in measurand_data:
899915
measurand_data[measurand] = {}
@@ -1036,8 +1052,8 @@ def on_boot_notification(self, **kwargs):
10361052
interval=3600,
10371053
status=RegistrationStatus.accepted.value,
10381054
)
1055+
self.received_boot_notification = True
10391056
_LOGGER.debug("Received boot notification for %s: %s", self.id, kwargs)
1040-
self.hass.async_create_task(self.notify_ha(f"Charger {self.id} booted"))
10411057
# update metrics
10421058
self._metrics[cdet.model.value].value = kwargs.get(
10431059
om.charge_point_model.name, None
@@ -1054,7 +1070,9 @@ def on_boot_notification(self, **kwargs):
10541070

10551071
self.hass.async_create_task(self.async_update_device_info(kwargs))
10561072
self.hass.async_create_task(self.central.update(self.central.cpid))
1057-
self.hass.async_create_task(self.post_connect())
1073+
if self.triggered_boot_notification is False:
1074+
self.hass.async_create_task(self.notify_ha(f"Charger {self.id} rebooted"))
1075+
self.hass.async_create_task(self.post_connect())
10581076
return resp
10591077

10601078
@on(Action.StatusNotification)

0 commit comments

Comments
 (0)