Skip to content

Commit 43ed189

Browse files
implement datastream websocket pushing for observations
1 parent 7308c1f commit 43ed189

File tree

1 file changed

+43
-9
lines changed

1 file changed

+43
-9
lines changed

oshconnect/datasource.py

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from consys4py.constants import APIResourceTypes
2020
from consys4py.datamodels.observations import ObservationOMJSONInline
2121
from consys4py.datamodels.swe_components import DataRecordSchema
22+
from websockets import ConnectionClosed
2223

2324
from .core_datamodels import DatastreamResource, SystemResource, TimePeriod
2425
from .timemanagement import TemporalModes
@@ -66,6 +67,9 @@ def __init__(self, name: str, datastream: DatastreamResource,
6667
self._url = None
6768
self._auth = None
6869
self._extra_headers = None
70+
self._insertion_websocket: websockets.client = None
71+
self._insertion_mode = "http"
72+
6973
if self._parent_system.get_parent_node().is_secure:
7074
self._auth = self._parent_system.get_parent_node().get_decoded_auth()
7175
self._extra_headers = {'Authorization': f'Basic {self._auth}'}
@@ -150,6 +154,8 @@ async def connect(self) -> websockets.WebSocketClientProtocol or None:
150154
if self._playback_mode == TemporalModes.REAL_TIME:
151155
self._playback_websocket = await websockets.connect(self._url,
152156
extra_headers=self._extra_headers)
157+
self._insertion_websocket = await websockets.connect(uri=self.generate_insertion_url(),
158+
additional_headers=self._extra_headers)
153159
self._status = "connected"
154160
return self._playback_websocket
155161
elif self._playback_mode == TemporalModes.ARCHIVE:
@@ -171,6 +177,7 @@ def disconnect(self):
171177
:return:
172178
"""
173179
self._playback_websocket.close()
180+
self._insertion_websocket.close()
174181

175182
def reset(self):
176183
"""
@@ -257,12 +264,22 @@ def generate_insertion_url(self) -> str:
257264
258265
:return:
259266
"""
260-
url_result = (
261-
f'http://{self._parent_system.get_parent_node().get_address()}:'
262-
f'{self._parent_system.get_parent_node().get_port()}'
263-
f'/sensorhub/api/datastreams/{self._datastream.ds_id}'
264-
f'/observations'
265-
)
267+
# TODO: needs to respect secure vs insecure protocols as well as http/ws
268+
# TODO: further implement MQTT client, esp bidirectional pub/sub
269+
if self._insertion_mode is not None or self._insertion_mode == "http":
270+
url_result = (
271+
f'http://{self._parent_system.get_parent_node().get_address()}:'
272+
f'{self._parent_system.get_parent_node().get_port()}'
273+
f'/sensorhub/api/datastreams/{self._datastream.ds_id}'
274+
f'/observations'
275+
)
276+
elif self._insertion_mode == "realtime":
277+
url_result = (
278+
f'ws://{self._parent_system.get_parent_node().get_address()}:'
279+
f'{self._parent_system.get_parent_node().get_port()}'
280+
f'/sensorhub/api/datastreams/{self._datastream.ds_id}'
281+
f'/observations'
282+
)
266283
return url_result
267284

268285
def insert_observation(self, observation: ObservationOMJSONInline):
@@ -271,9 +288,26 @@ def insert_observation(self, observation: ObservationOMJSONInline):
271288
:param observation: ObservationOMJSONInline object
272289
:return:
273290
"""
274-
api_helper = self._parent_system.get_parent_node().get_api_helper()
275-
api_helper.post_resource(APIResourceTypes.OBSERVATION, parent_res_id=self._datastream.ds_id,
276-
data=observation.model_dump(), req_headers={'Content-Type': 'application/om+json'})
291+
if self._insertion_mode is None or self._insertion_mode == "http":
292+
api_helper = self._parent_system.get_parent_node().get_api_helper()
293+
api_helper.post_resource(APIResourceTypes.OBSERVATION, parent_res_id=self._datastream.ds_id,
294+
data=observation.model_dump(), req_headers={'Content-Type': 'application/om+json'})
295+
elif self._insertion_mode == "realtime":
296+
try:
297+
self._insertion_websocket.send(observation.model_dump())
298+
except ConnectionClosed:
299+
print("Connection closed")
300+
301+
# async def _produce_observation(self, observation: ObservationOMJSONInline):
302+
# """
303+
# Posts an observation to the server using the websocket connection
304+
# :param observation: ObservationOMJSONInline object
305+
# :return:
306+
# """
307+
# if self._insertion_websocket is not None:
308+
# await self._insertion_websocket.send(observation.model_dump())
309+
# else:
310+
# raise ValueError("Websocket connection not established")
277311

278312

279313
class DataStreamHandler:

0 commit comments

Comments
 (0)