diff --git a/backend/api/models/teros_data.py b/backend/api/models/teros_data.py index 826e169c..5dc474e2 100644 --- a/backend/api/models/teros_data.py +++ b/backend/api/models/teros_data.py @@ -108,6 +108,7 @@ def get_teros_data_obj( if end_time is None: end_time = datetime.now() + data = { "timestamp": [], "vwc": [], diff --git a/backend/api/resources/cell_data.py b/backend/api/resources/cell_data.py index c5be07d8..0c4f2653 100644 --- a/backend/api/resources/cell_data.py +++ b/backend/api/resources/cell_data.py @@ -1,40 +1,346 @@ -from flask import request, jsonify -from flask_restful import Resource -import pandas as pd -from ..schemas.get_cell_data_schema import GetCellDataSchema -from ..models.power_data import PowerData # noqa: F401 -from ..models.teros_data import TEROSData # noqa: F401 -from ..models.sensor import Sensor # noqa: F401 +import csv +from datetime import datetime from io import StringIO -from celery import shared_task +from flask import Response, request, stream_with_context +from flask_restful import Resource +from sqlalchemy import func +from dateutil.relativedelta import relativedelta + +from ..models import db +from ..models.cell import Cell +from ..models.data import Data +from ..models.power_data import PowerData +from ..models.sensor import Sensor +from ..models.teros_data import TEROSData +from ..schemas.get_cell_data_schema import GetCellDataSchema get_cell_data = GetCellDataSchema() +CSV_HEADERS = [ + "cell_id", + "cell_name", + "timestamp", + "vwc", + "temp", + "ec", + "raw_vwc", + "v", + "i", + "p", + "data", + "measurement", + "unit", + "type", +] + +VOID = "void" + +def _serialize_timestamp(timestamp): + if timestamp is None: + return VOID + if isinstance(timestamp, datetime): + return timestamp.isoformat(sep=" ") + return str(timestamp) + + +def _cell_filename(): + stamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") + return f"cell-data-{stamp}.csv" + + +def _normalize_time_range(start_time, end_time): + if start_time is None: + start_time = datetime.now() - relativedelta(months=1) + if end_time is None: + end_time = datetime.now() + return start_time, end_time + + +def _iter_teros_rows(cell_id, resample, start_time, end_time): + if resample == "none": + stmt = ( + db.select( + TEROSData.ts.label("timestamp"), + TEROSData.vwc.label("vwc"), + TEROSData.temp.label("temp"), + TEROSData.ec.label("ec"), + TEROSData.raw_vwc.label("raw_vwc"), + ) + .where( + (TEROSData.cell_id == cell_id) + & (TEROSData.ts.between(start_time, end_time)) + ) + .order_by(TEROSData.ts) + ) + else: + date_trunc = func.date_trunc(resample, TEROSData.ts).label("timestamp") + normalized_vwc = TEROSData._to_percent_if_fraction_expr(TEROSData.vwc) + stmt = ( + db.select( + date_trunc, + func.avg(normalized_vwc).label("vwc"), + func.avg(TEROSData.temp).label("temp"), + func.avg(TEROSData.ec).label("ec"), + func.avg(TEROSData.raw_vwc).label("raw_vwc"), + ) + .where( + (TEROSData.cell_id == cell_id) + & (TEROSData.ts.between(start_time, end_time)) + ) + .group_by(date_trunc) + .order_by(date_trunc) + ) + + # Ensure large exports don't get fully buffered in memory by the DB driver. + result = db.session.execute(stmt.execution_options(stream_results=True)).yield_per( + 1000 + ) + for row in result: + yield { + "timestamp": row.timestamp, + "vwc": TEROSData._to_percent_if_fraction(row.vwc), + "temp": row.temp, + "ec": int(row.ec) if row.ec is not None else None, + "raw_vwc": row.raw_vwc, + } + + +def _iter_power_rows(cell_id, resample, start_time, end_time): + if resample == "none": + stmt = ( + db.select( + PowerData.ts.label("timestamp"), + (PowerData.voltage * 1e3).label("v"), + (PowerData.current * 1e6).label("i"), + (PowerData.voltage * PowerData.current * 1e6).label("p"), + ) + .where( + (PowerData.cell_id == cell_id) + & (PowerData.ts.between(start_time, end_time)) + ) + .order_by(PowerData.ts) + ) + else: + date_trunc = func.date_trunc(resample, PowerData.ts).label("timestamp") + stmt = ( + db.select( + date_trunc, + func.avg(PowerData.voltage * 1e3).label("v"), + func.avg(PowerData.current * 1e6).label("i"), + func.avg(PowerData.voltage * PowerData.current * 1e6).label("p"), + ) + .where( + (PowerData.cell_id == cell_id) + & (PowerData.ts.between(start_time, end_time)) + ) + .group_by(date_trunc) + .order_by(date_trunc) + ) -@shared_task(bind=True, ignore_result=False) -def stream_csv(self, request_args): - """Creates a task that queries cell data and writes into buffer + result = db.session.execute(stmt.execution_options(stream_results=True)).yield_per( + 1000 + ) + for row in result: + yield { + "timestamp": row.timestamp, + "v": row.v, + "i": row.i, + "p": row.p, + } - jmadden173: Temperarily converted this to a blank download. - Arguments: - request_args -- arguments of the request being made - """ +def _sensor_value_column(data_type): + if data_type == "float": + return Data.float_val + if data_type == "int": + return Data.int_val + if data_type == "text": + return Data.text_val + return None - csv_buffer = StringIO() - df = pd.DataFrame() +def _iter_sensor_rows(cell_id, resample, start_time, end_time): + sensor = Sensor.query.filter_by( + name="phytos31", + measurement="voltage", + cell_id=cell_id, + ).first() - df.to_csv(csv_buffer, index=False) + if sensor is None: + return - return csv_buffer.getvalue() + value_column = _sensor_value_column(sensor.data_type) + if value_column is None: + return + + # Text values can't be averaged; safest behavior is to disable resampling + # rather than raising a 500 during export. + if resample != "none" and sensor.data_type == "text": + resample = "none" + + if resample == "none": + stmt = ( + db.select( + Data.ts.label("timestamp"), + value_column.label("data"), + ) + .where(Data.sensor_id == sensor.id) + .filter(Data.ts.between(start_time, end_time)) + .order_by(Data.ts) + ) + else: + date_trunc = func.date_trunc(resample, Data.ts).label("timestamp") + stmt = ( + db.select( + date_trunc, + func.avg(value_column).label("data"), + ) + .where(Data.sensor_id == sensor.id) + .filter(Data.ts.between(start_time, end_time)) + .group_by(date_trunc) + .order_by(date_trunc) + ) + + result = db.session.execute(stmt.execution_options(stream_results=True)).yield_per( + 1000 + ) + for row in result: + yield { + "timestamp": row.timestamp, + "data": row.data, + "measurement": sensor.measurement, + "unit": sensor.unit, + "type": sensor.data_type, + } + + +def _next_or_none(iterator): + try: + return next(iterator) + except StopIteration: + return None + + +def _merge_cell_rows(cell): + teros_rows = _iter_teros_rows( + cell["id"], + cell["resample"], + cell["start_time"], + cell["end_time"], + ) + power_rows = _iter_power_rows( + cell["id"], + cell["resample"], + cell["start_time"], + cell["end_time"], + ) + sensor_rows = _iter_sensor_rows( + cell["id"], + cell["resample"], + cell["start_time"], + cell["end_time"], + ) + + sources = [ + {"iterator": iter(teros_rows), "current": None}, + {"iterator": iter(power_rows), "current": None}, + {"iterator": iter(sensor_rows or ()), "current": None}, + ] + + for source in sources: + source["current"] = _next_or_none(source["iterator"]) + + while any(source["current"] is not None for source in sources): + current_timestamp = min( + source["current"]["timestamp"] + for source in sources + if source["current"] is not None + ) + row = { + "cell_id": cell["id"], + "cell_name": cell["name"], + "timestamp": _serialize_timestamp(current_timestamp), + "vwc": VOID, + "temp": VOID, + "ec": VOID, + "raw_vwc": VOID, + "v": VOID, + "i": VOID, + "p": VOID, + "data": VOID, + "measurement": VOID, + "unit": VOID, + "type": VOID, + } + + for source in sources: + while ( + source["current"] is not None + and source["current"]["timestamp"] == current_timestamp + ): + for key, value in source["current"].items(): + if key != "timestamp" and value is not None: + row[key] = value + source["current"] = _next_or_none(source["iterator"]) + + yield row + + +def _stream_csv_rows(cells, resample, start_time, end_time): + line_buffer = StringIO() + writer = csv.DictWriter(line_buffer, fieldnames=CSV_HEADERS) + writer.writeheader() + yield line_buffer.getvalue() + line_buffer.seek(0) + line_buffer.truncate(0) + + for cell in cells: + cell["resample"] = resample + cell["start_time"] = start_time + cell["end_time"] = end_time + for row in _merge_cell_rows(cell): + writer.writerow(row) + yield line_buffer.getvalue() + line_buffer.seek(0) + line_buffer.truncate(0) class Cell_Data(Resource): def get(self): - result = stream_csv.delay(request.args) - return jsonify({"result_id": result.id}) + v_args = get_cell_data.load(dict(request.args)) + start_time, end_time = _normalize_time_range( + v_args.get("startTime"), + v_args.get("endTime"), + ) + cell_ids = [ + int(cell_id.strip()) + for cell_id in v_args["cellIds"].split(",") + if cell_id.strip() + ] + cells = ( + Cell.query.with_entities(Cell.id, Cell.name) + .filter(Cell.id.in_(cell_ids)) + .order_by(Cell.id) + .all() + ) + cell_rows = [{"id": cell.id, "name": cell.name} for cell in cells] + + response = Response( + stream_with_context( + _stream_csv_rows( + cell_rows, + v_args["resample"], + start_time, + end_time, + ) + ), + mimetype="text/csv", + ) + response.headers["Content-Disposition"] = ( + f'attachment; filename="{_cell_filename()}"' + ) + return response def post(self): pass diff --git a/backend/api/utils/get_or_create.py b/backend/api/utils/get_or_create.py index 36aa189c..2a58d88b 100644 --- a/backend/api/utils/get_or_create.py +++ b/backend/api/utils/get_or_create.py @@ -22,15 +22,15 @@ def get_or_create_logger(sess, name, mac=None, hostname=None): """ stmt = select(Logger).where(Logger.name == name) - if mac: - stmt = stmt.where(Logger.mac == mac) - if hostname: - stmt = stmt.where(Logger.hostname == hostname) + # Logger model currently has no mac/hostname columns; keep params for + # backwards compatibility but ignore them for querying. log = sess.execute(stmt).one_or_none() if not log: - log = Logger(name=name, mac=mac, hostname=hostname) + # Logger model doesn't accept mac/hostname; keep creation minimal + # for import flows. + log = Logger(name=name) sess.add(log) sess.flush() else: diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 2d60e12b..55007437 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -34,8 +34,6 @@ def requires_ttn(): env = os.environ.get("TTN_API_KEY") return pytest.mark.skipif(env is None, reason="TTN_API_KEY not availble") - - # -------- # Fixtures # -------- diff --git a/backend/tests/test_cell_data.py b/backend/tests/test_cell_data.py new file mode 100644 index 00000000..69a16f83 --- /dev/null +++ b/backend/tests/test_cell_data.py @@ -0,0 +1,126 @@ +from datetime import datetime + +from api import db +from api.models.cell import Cell +from api.models.data import Data +from api.models.logger import Logger +from api.models.power_data import PowerData +from api.models.sensor import Sensor +from api.models.teros_data import TEROSData + + +def test_cell_data_streams_csv(init_database): + cell1 = Cell("cell_csv_1", "", 1, 1, False, None) + cell2 = Cell("cell_csv_2", "", 1, 1, False, None) + logger = Logger("logger_csv_1") + db.session.add_all([cell1, cell2, logger]) + db.session.commit() + + ts = datetime(2024, 1, 1, 12, 0) + ts2 = datetime(2024, 1, 1, 12, 30) + + db.session.add( + TEROSData( + cell_id=cell1.id, + ts=ts, + vwc=0.42, + raw_vwc=420.0, + temp=21.5, + ec=120, + water_pot=0.0, + ) + ) + # Second cell has only TEROS data so we can validate void-filling. + db.session.add( + TEROSData( + cell_id=cell2.id, + ts=ts, + vwc=0.5, # fractional should normalize to 50.0 + raw_vwc=500.0, + temp=20.0, + ec=100, + water_pot=0.0, + ) + ) + db.session.add( + PowerData( + logger_id=logger.id, + cell_id=cell1.id, + ts=ts, + voltage=1.5, + current=0.002, + ) + ) + sensor = Sensor( + cell_id=cell1.id, + measurement="voltage", + data_type="float", + unit="V", + name="phytos31", + ) + db.session.add(sensor) + db.session.commit() + db.session.add(Data(sensor_id=sensor.id, ts=ts, float_val=3.14)) + # Add a second TEROS point within the same hour to validate resample=hour averaging. + db.session.add( + TEROSData( + cell_id=cell1.id, + ts=ts2, + vwc=50.0, # already percent; should not double scale + raw_vwc=500.0, + temp=22.5, + ec=130, + water_pot=0.0, + ) + ) + db.session.commit() + + response = init_database.get( + "/api/cell/datas", + query_string={ + "cellIds": f"{cell1.id},{cell2.id}", + "resample": "none", + "startTime": "Mon, 01 Jan 2024 00:00:00 GMT", + "endTime": "Tue, 02 Jan 2024 00:00:00 GMT", + }, + ) + + assert response.status_code == 200 + assert response.mimetype == "text/csv" + assert "attachment;" in response.headers["Content-Disposition"] + assert response.is_streamed + + body = response.get_data(as_text=True) + lines = body.strip().splitlines() + + expected_header = ( + "cell_id,cell_name,timestamp,vwc,temp,ec,raw_vwc,v,i,p,data," + "measurement,unit,type" + ) + assert lines[0] == expected_header + expected_line1 = ( + f"{cell1.id},{cell1.name},2024-01-01 12:00:00,42.0,21.5,120,420.0," + "1500.0,2000.0,3000.0,3.14,voltage,V,float" + ) + assert expected_line1 in lines + # Validate missing power/sensor columns become "void" for the second cell. + expected_line2 = ( + f"{cell2.id},{cell2.name},2024-01-01 12:00:00,50.0,20.0,100,500.0," + "void,void,void,void,void,void,void" + ) + assert expected_line2 in lines + + # Resample=hour should average normalized VWC values (0.42->42 and 50 stays 50). + response_resampled = init_database.get( + "/api/cell/datas", + query_string={ + "cellIds": str(cell1.id), + "resample": "hour", + "startTime": "Mon, 01 Jan 2024 00:00:00 GMT", + "endTime": "Tue, 02 Jan 2024 00:00:00 GMT", + }, + ) + assert response_resampled.status_code == 200 + body2 = response_resampled.get_data(as_text=True) + # Expected average: (42 + 50) / 2 = 46.0 + assert f"{cell1.id},{cell1.name},2024-01-01 12:00:00,46.0" in body2 diff --git a/backend/tests/test_ttn.py b/backend/tests/test_ttn.py index 86bfe1a6..0a7318c0 100644 --- a/backend/tests/test_ttn.py +++ b/backend/tests/test_ttn.py @@ -1,3 +1,4 @@ +import os from api.ttn.end_devices import EndDevice, EntsEndDevice, TTNApi from .conftest import requires_ttn @@ -8,6 +9,17 @@ def test_create_api(): global api api = TTNApi() +# These tests exercise real TTN API calls. Skip by default unless valid creds are present. +_ttn_key = (os.getenv("TTN_API_KEY") or "").strip() +_ttn_app = (os.getenv("TTN_APP_ID") or "").strip() +pytestmark = pytest.mark.skipif( + (not _ttn_key) + or (not _ttn_app) + or (_ttn_key.lower() in {"dummy", "changeme", "token", "test"}) + or (_ttn_app.lower() in {"dummy", "changeme", "app", "test"}), + reason="TTN integration tests require real TTN_API_KEY and TTN_APP_ID.", +) + @requires_ttn() def test_create_end_device(): diff --git a/frontend/src/pages/dashboard/components/DownloadBtn.jsx b/frontend/src/pages/dashboard/components/DownloadBtn.jsx index f53e8e56..e2f15587 100644 --- a/frontend/src/pages/dashboard/components/DownloadBtn.jsx +++ b/frontend/src/pages/dashboard/components/DownloadBtn.jsx @@ -1,48 +1,39 @@ // import { useEffect } from 'react'; import { Button } from '@mui/material'; import PropTypes from 'prop-types'; -import { getCellData, pollCellDataResult } from '../../../services/cell'; +import { getCellData } from '../../../services/cell'; import { useState } from 'react'; function DownloadBtn({ cells, startDate, endDate }) { const [downloadStatus, setDownloadStatus] = useState(false); - const INTERVAL = 2000; - const BACKOFF = 2000; - let pendingResponses = 0; - - const pollTaskStatus = async (taskId, fileName, pollDuration) => { - try { - const { state, status } = await pollCellDataResult(taskId); - if (state === 'SUCCESS') { - const blob = new Blob([status], { type: 'text/csv' }); - const a = document.createElement('a'); - a.download = fileName; - a.href = window.URL.createObjectURL(blob); - document.body.appendChild(a); - a.click(); - document.body.removeChild(a); - setDownloadStatus(false); - } else { - setTimeout(() => { - pendingResponses += 1; - pollDuration = BACKOFF * pendingResponses + pollDuration; - return pollTaskStatus(taskId, fileName, pollDuration); - }, pollDuration); - } - } catch (error) { - console.error('Error polling the task status', error); + const downloadFile = async () => { + if (!cells?.length) { + return; } - }; - const downloadFile = () => { - for (const { id, name } of cells) { - setDownloadStatus(true); - const fileName = name + '.csv'; + setDownloadStatus(true); + + try { const resample = 'none'; - getCellData(id, resample, startDate, endDate).then((data) => { - const { result_id } = data; - pollTaskStatus(result_id, fileName, INTERVAL); - }); + const { blob, fileName } = await getCellData( + cells.map((cell) => cell.id), + resample, + startDate, + endDate, + ); + + const downloadUrl = window.URL.createObjectURL(blob); + const a = document.createElement('a'); + a.download = fileName || 'cell-data.csv'; + a.href = downloadUrl; + document.body.appendChild(a); + a.click(); + document.body.removeChild(a); + window.URL.revokeObjectURL(downloadUrl); + } catch (error) { + console.error('Error exporting CSV', error); + } finally { + setDownloadStatus(false); } }; /** @@ -51,7 +42,7 @@ function DownloadBtn({ cells, startDate, endDate }) { **/ const exportToCsv = (e) => { e.preventDefault(); - downloadFile(); + void downloadFile(); }; return (