diff --git a/.env b/.env new file mode 100644 index 0000000..826cc47 --- /dev/null +++ b/.env @@ -0,0 +1,12 @@ +# Redis +REDIS_HOST=redis +REDIS_PORT=6379 +REDIS_DB=0 + +# NFGDA Service +MAX_CONCURRENT_JOBS=2 # Max number of jobs to run at once +MAX_NO_DATA_POLLS=10 # Polls radar S3 bucket this many times before giving up +FILE_EXPIRATION_TIME=1440 # 24 hours (minutes) + +# Backend +MAX_JOB_DURATION=180 # Maximum total duration of job timebox (minutes, default is 3 hours) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml new file mode 100644 index 0000000..9693135 --- /dev/null +++ b/.github/workflows/integration-tests.yml @@ -0,0 +1,48 @@ +name: Integration Tests + +on: + pull_request: + +jobs: + integration-tests: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install test dependencies + run: pip install pytest requests + + - name: Start Docker Compose stack + run: docker compose up -d --build + + - name: Wait for backend to be ready + run: | + echo "Waiting for backend..." + for i in $(seq 1 30); do + if curl -sf http://localhost:8001/apis/stations > /dev/null; then + echo "Backend is up" + exit 0 + fi + echo " attempt $i/30..." + sleep 5 + done + echo "Backend did not become ready in time" + docker compose logs backend + exit 1 + + - name: Run integration tests (non-slow) + run: pytest test_endpoints.py -v -m "not slow" + + - name: Dump logs on failure + if: failure() + run: docker compose logs + + - name: Tear down stack + if: always() + run: docker compose down diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 2ccc581..e223abe 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -1,16 +1,16 @@ name: Lint -on: [push, pull_request] +on: [pull_request] jobs: lint: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 + - uses: actions/setup-python@v5 with: - python-version: '3.x' + python-version: "3.x" - name: Install dependencies run: pip install ruff - name: Run linter - run: ruff . + run: ruff check . diff --git a/README.md b/README.md index 1d8971e..ca7f719 100644 --- a/README.md +++ b/README.md @@ -2,43 +2,59 @@ This is a prototype web interface to interact with the gust front detection algorithm found [here](https://github.com/firelab/NFGDA). -# How To Run +# How To Run First time running project? -1. Navigate to project directory containing `docker-compose.yml' -2. Run `docker compose up -d --build' + +1. Navigate to project directory containing `docker-compose.yml` +2. Run `docker compose up -d --build` 3. Navigate to http://localhost:5173 4. Play widdit - To re-launch app, run `docker compose up -d` - To restart docker containers, run `docker compose restart -d` - # Frontend +# Backend (/backend and /nfgda_service) -# Backend +Backend directory structure: -Backend directory structure: - app.py contains the API endpoints - /apis contains the API endpoint definitions - API call logic defined in src/ -- src/ contains the backend logic (Not responsible for API endpoints that orchestrate or handle HTTP requests - Contains the business logic of the application only) +- src/ contains the backend logic (Not responsible for API endpoints that orchestrate or handle HTTP requests. Contains the business logic of the application only) + +NFGDA Service directory structure: +- /nfgda_service contains the NFGDA service logic +- responsible for all NFGDA execution, output processing, and file management -# Todo (before MSU handoff) +And then there's a redis instance living at port 6379 where all the job status and asset information is stored. -- Guard against short jobs that run forever for some reason -- Figure out zoom level / blank frame issue on frontend -- Switching to a new station view pauses slide deck playthrough -- Convert geotiff output to cloud-optimized-geotiffs -- Remove "expired" job files and produced resources after 24 hours (set to env variable) -- Figure out what is a "reasonable" time to run a historical job and set a hard limit (Natalie said a few hours, we'll set it to 3 to start with in env) -- Code cleanup / add comments where necessary +# Todo before MSU handoff + +- [ ] Figure out zoom level / blank frame issue on frontend +- [ ] Switching to a new station view pauses slide deck playthrough +- [ ] Can we pretty up the landing page? Put a title on it somewhere before the research celebration? +- [ ] Set opacity slider on frontend +- [ ] Enhance resolution of output on frontend +- [ ] Add a "clear" button to the map that clears all job assets from the map +- [ ] Deliver frame time-stamps to the frontend +- [ ] Switch to cloud-optimized geotiffs +- [ ] Make some stuff environment variables instead of random variables everywhere +- [ ] Discuss pixel-width of gust fronts written to output file next team meeting +- [ ] Diff the NFGDA code used in nfgda_service with the original NFGDA code, see if there are any useful features we're missing out on or bugs we introduced +- [ ] Backend code cleanup / add comments where necessary # "Nice to have" features +- There a should probably be a warning that shows up for small numbers of assets per job (2 frames produced or less). Maybe if not enough assets are produced, the job request could automatically re-run with a larger time window? - Average time to job completion estimator (small addition: new counter in redis, average out) -- Serve tiles instead of individual GeoTIFFs (big refactor) +- Serve tiles instead of individual GeoTIFFs (big refactor, honestly might not be worth at as Cloud-optimized-geotiffs are kinda the future anyway) - Hash job IDs to make them unguessable, so resources can't be directly accessed via URL (little development effort, likely med/large refactor effort) - + +# Todo after MSU handoff (futures devs read this pls) + +- Check that automatic asset deletion occurs within the timeframe specified (should be 24 hours) +- Familiarize with the .env file and environment variables, and what they do diff --git a/backend/apis/retrieve_frames.py b/backend/apis/retrieve_frames.py index 45dfeb4..78f5b79 100644 --- a/backend/apis/retrieve_frames.py +++ b/backend/apis/retrieve_frames.py @@ -1,7 +1,23 @@ """ -Frame Data API: returns the list of rendered frames for a completed job. +Frame Data API: returns a single rendered GeoTIFF frame for a completed job. """ -def get_frames(job_id: str): - - pass +import os +from flask import send_file, abort + + +def get_frame(job_id: str, index: int): + """Return a single GeoTIFF frame file for the given job and frame index.""" + job_dir = "/processed_data/" + job_id + if not os.path.exists(job_dir): + abort(404, description="Job not found") + + frame_path = job_dir + f"/frame_{index}.tif" + if not os.path.exists(frame_path): + abort(404, description="Frame not found") + + return send_file( + frame_path, + mimetype="image/tiff", + as_attachment=False + ) diff --git a/backend/apis/run_request.py b/backend/apis/run_request.py index 4dca0b0..4b3218d 100644 --- a/backend/apis/run_request.py +++ b/backend/apis/run_request.py @@ -1,4 +1,5 @@ +import os import uuid from datetime import datetime, timedelta, timezone from flask import jsonify @@ -15,14 +16,14 @@ def send_job_to_redis_queue(redis_client, request_fields: dict): Response shape: { "job_id": "", - "status": 200 + "status": 202 OR "error": "", "status": 400 } """ - # Validate stationId + # validate stationId station_id = request_fields.get("stationId") if not station_id: return jsonify({"error": "Missing stationId request field"}), 400 @@ -32,7 +33,8 @@ def send_job_to_redis_queue(redis_client, request_fields: dict): StationService(redis_client).get_station(station_id) except ValueError: return jsonify({"error": f"Invalid station ID: {station_id}"}), 400 - # Validate and/or set default timebox parameters + + # validate and/or set default timebox parameters validation_error = validate_time_parameters(request_fields) if validation_error: return validation_error, 400 @@ -46,11 +48,14 @@ def send_job_to_redis_queue(redis_client, request_fields: dict): # add job to redis job_key = f"job:{job_id}" + expiry_minutes = int(os.getenv("FILE_EXPIRATION_TIME", "1440")) + expiry_timestamp = (datetime.now(timezone.utc) + timedelta(minutes=expiry_minutes)).strftime("%Y-%m-%dT%H:%M:%SZ") redis_client.hset(job_key, mapping={ "stationId": request_fields["stationId"], "startUtc": request_fields["startUtc"], "endUtc": request_fields["endUtc"], - "status": "PENDING" + "status": "PENDING", + "asset_expiry_timestamp": expiry_timestamp }) # push job id to job queue @@ -63,13 +68,14 @@ def send_job_to_redis_queue(redis_client, request_fields: dict): def validate_time_parameters(request_fields: dict): """Validate the time parameters recieved via the request.""" - # Default timebox when not provided: look back 15 minutes from now - # so the algorithm captures 2-3 recent NEXRAD scans for detection + forecast - # (2 scan minimum needed for forcasting) + # Default timebox when not provided: look back over the last ~25 minutes, ending + # 10 minutes ago. The 10-minute buffer ensures the algorithm's end time is always + # fully in the past — if endUtc is too close to "now" the algorithm enters live + # polling mode and runs indefinitely. now = datetime.now(timezone.utc) if not request_fields.get("startUtc") and not request_fields.get("endUtc"): - request_fields["startUtc"] = (now - timedelta(minutes=15)).strftime("%Y-%m-%dT%H:%M:%SZ") - request_fields["endUtc"] = now.strftime("%Y-%m-%dT%H:%M:%SZ") + request_fields["startUtc"] = (now - timedelta(minutes=35)).strftime("%Y-%m-%dT%H:%M:%SZ") + request_fields["endUtc"] = (now - timedelta(minutes=10)).strftime("%Y-%m-%dT%H:%M:%SZ") elif not request_fields.get("startUtc") or not request_fields.get("endUtc"): return jsonify({"error": "Must provide both startUtc and endUtc, or neither"}) @@ -88,15 +94,19 @@ def validate_time_parameters(request_fields: dict): if end_utc <= start_utc: return jsonify({"error": "endUtc must be after startUtc"}) - # Duration must be between 5 minutes and 6 hours + # duration must be between 15 minutes and MAX_JOB_DURATION (default is 180 minutes / 3 hours) + max_duration = timedelta(minutes=int(os.getenv("MAX_JOB_DURATION", "180"))) + max_hours = max_duration.total_seconds() / 3600 duration = end_utc - start_utc - if duration < timedelta(minutes=5): - return jsonify({"error": "Timebox duration must be at least 5 minutes"}) - if duration > timedelta(hours=6): - return jsonify({"error": "Timebox duration must not exceed 6 hours"}) - - # endUtc must not be in the future - if end_utc > now: - return jsonify({"error": "endUtc must not be later than the current time"}) + if duration < timedelta(minutes=15): + return jsonify({"error": "Timebox duration must be at least 15 minutes"}) + if duration > max_duration: + return jsonify({"error": f"Timebox duration must not exceed {max_hours:.0f} hours"}) + + # endUtc must be at least 5 minutes in the past — the algorithm enters a live + # polling loop if endUtc is too close to the current time, causing jobs to run + # indefinitely instead of processing a closed historical window. + if end_utc > now - timedelta(minutes=5): + return jsonify({"error": "endUtc must be at least 5 minutes in the past"}) return None diff --git a/backend/app.py b/backend/app.py index ef53456..7d6a753 100644 --- a/backend/app.py +++ b/backend/app.py @@ -1,10 +1,9 @@ -import os import redis -from flask import Flask, jsonify, request, send_file, abort +from flask import Flask, jsonify, request from apis.stations import list_stations_api from apis.run_request import send_job_to_redis_queue from apis.status import get_job_status -from apis.retrieve_frames import get_frames +from apis.retrieve_frames import get_frame app = Flask(__name__) @@ -12,7 +11,7 @@ redis_client = redis.Redis(host='redis', port=6379, db=0, decode_responses=True) # Station List API -@app.route("/APIs/stations", methods=["GET"]) +@app.route("/apis/stations", methods=["GET"]) def stations_endpoint(): """ Returns: @@ -27,7 +26,7 @@ def stations_endpoint(): # Algorithm Runner API -@app.route("/APIs/run", methods=["POST"]) +@app.route("/apis/run", methods=["POST"]) def run_endpoint(): """Takes station and time frame args, kicks off an NFGDA processing job, and returns the new job ID and status code.""" if not request.json: @@ -37,27 +36,14 @@ def run_endpoint(): # Frame Data API -@app.route("/APIs/jobs//frames/", methods=["GET"]) -def get_frame(job_id, index): +@app.route("/apis/jobs//frames/", methods=["GET"]) +def get_frame_endpoint(job_id, index): """Takes job ID and frame index, returns a single GeoTIFF file.""" - - job_dir = "/processed_data/" + job_id - if not os.path.exists(job_dir): - abort(404, description="Job not found") - - frame_path = job_dir + f"/frame_{index}.tif" - if not os.path.exists(frame_path): - abort(404, description="Frame not found") - - return send_file( - frame_path, - mimetype="image/tiff", - as_attachment=False - ) + return get_frame(job_id, index) # Job Status API -@app.route("/APIs/status", methods=["GET"]) +@app.route("/apis/status", methods=["GET"]) def status_endpoint(): """Takes job ID, returns status.""" job_id = request.args.get("job_id") diff --git a/docker-compose.yml b/docker-compose.yml index b6aacec..f75b717 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ services: redis: image: redis:7-alpine ports: - - "6379:6379" + - "${REDIS_PORT}:6379" nfgda-service: build: @@ -14,11 +14,12 @@ services: - redis - backend environment: - - REDIS_HOST=redis - - REDIS_PORT=6379 - - REDIS_DB=0 - - MAX_CONCURRENT_JOBS=2 - - MAX_NO_DATA_POLLS=10 + - REDIS_HOST=${REDIS_HOST} + - REDIS_PORT=${REDIS_PORT} + - REDIS_DB=${REDIS_DB} + - MAX_CONCURRENT_JOBS=${MAX_CONCURRENT_JOBS} + - MAX_NO_DATA_POLLS=${MAX_NO_DATA_POLLS} + - FILE_EXPIRATION_TIME=${FILE_EXPIRATION_TIME} volumes: - ./nfgda_output:/nfgda_output - ./processed_data:/processed_data @@ -44,9 +45,10 @@ services: depends_on: - redis environment: - - REDIS_HOST=redis - - REDIS_PORT=6379 - - REDIS_DB=0 + - REDIS_HOST=${REDIS_HOST} + - REDIS_PORT=${REDIS_PORT} + - REDIS_DB=${REDIS_DB} + - MAX_JOB_DURATION=${MAX_JOB_DURATION} volumes: - ./nfgda_output:/nfgda_output - ./processed_data:/processed_data \ No newline at end of file diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index 96db11e..25f5930 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -62,7 +62,7 @@ export default function App() { setjobId(""); setNumFrames(0); setFrames([]); - const response = await fetch("/APIs/run", { + const response = await fetch("/apis/run", { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -88,7 +88,7 @@ useEffect(() => { console.log(`attempting to fetch ${numFrames} frames for job ${jobId}`); try { const promises = Array.from({ length: numFrames }, (_, i) => - fetch(`/APIs/jobs/${jobId}/frames/${i}`) + fetch(`/apis/jobs/${jobId}/frames/${i}`) .then(res => { if (!res.ok) throw new Error(`Failed frame ${i}`); return res.blob(); @@ -105,10 +105,10 @@ useEffect(() => { fetchFrames(); }, [jobStatus, jobId, numFrames]); - // fetch radar stations from backend at /APIs/stations + // fetch radar stations from backend at /apis/stations useEffect(() => { async function loadStations() { - const response = await fetch("/APIs/stations"); + const response = await fetch("/apis/stations"); const stationJson = await response.json(); const nextStations = Array.isArray(stationJson?.features) ? stationJson.features @@ -126,7 +126,7 @@ useEffect(() => { } const intervalId = setInterval(async () => { try { - const response = await fetch(`/APIs/status?job_id=${jobId}`); + const response = await fetch(`/apis/status?job_id=${jobId}`); const data = await response.json(); console.log(data); setjobStatus(data.status); diff --git a/frontend/vite.config.js b/frontend/vite.config.js index 052046f..a7fc9f0 100644 --- a/frontend/vite.config.js +++ b/frontend/vite.config.js @@ -7,7 +7,7 @@ export default defineConfig({ plugins: [react(), tailwindcss()], server: { proxy: { - "/APIs": "http://backend:8001", //backend to work with docker, localhost w/o + "/apis": "http://backend:8001", //backend to work with docker, localhost w/o }, }, }); diff --git a/nfgda_service/Dockerfile b/nfgda_service/Dockerfile index e91ce22..274d204 100644 --- a/nfgda_service/Dockerfile +++ b/nfgda_service/Dockerfile @@ -23,4 +23,7 @@ RUN pip install --no-cache-dir -r requirements.txt COPY . /app +# algorithm modules (nfgda/, python/, scripts/) now live under /app/algorithm +ENV PYTHONPATH="/app/algorithm" + CMD ["python", "main.py"] \ No newline at end of file diff --git a/nfgda_service/matlab/NF00_header.m b/nfgda_service/algorithm/matlab/NF00_header.m similarity index 100% rename from nfgda_service/matlab/NF00_header.m rename to nfgda_service/algorithm/matlab/NF00_header.m diff --git a/nfgda_service/matlab/NF00ref_YHWANG.fis b/nfgda_service/algorithm/matlab/NF00ref_YHWANG.fis similarity index 100% rename from nfgda_service/matlab/NF00ref_YHWANG.fis rename to nfgda_service/algorithm/matlab/NF00ref_YHWANG.fis diff --git a/nfgda_service/matlab/NF07_handpick_region.m b/nfgda_service/algorithm/matlab/NF07_handpick_region.m similarity index 100% rename from nfgda_service/matlab/NF07_handpick_region.m rename to nfgda_service/algorithm/matlab/NF07_handpick_region.m diff --git a/nfgda_service/matlab/NF08_making_training_dataset.m b/nfgda_service/algorithm/matlab/NF08_making_training_dataset.m similarity index 100% rename from nfgda_service/matlab/NF08_making_training_dataset.m rename to nfgda_service/algorithm/matlab/NF08_making_training_dataset.m diff --git a/nfgda_service/matlab/NF08_reading_files_FIS.m b/nfgda_service/algorithm/matlab/NF08_reading_files_FIS.m similarity index 100% rename from nfgda_service/matlab/NF08_reading_files_FIS.m rename to nfgda_service/algorithm/matlab/NF08_reading_files_FIS.m diff --git a/nfgda_service/matlab/NF10_evalfuzzy_and_skel.m b/nfgda_service/algorithm/matlab/NF10_evalfuzzy_and_skel.m similarity index 100% rename from nfgda_service/matlab/NF10_evalfuzzy_and_skel.m rename to nfgda_service/algorithm/matlab/NF10_evalfuzzy_and_skel.m diff --git a/nfgda_service/matlab/NF13_making_stats_evaluation.m b/nfgda_service/algorithm/matlab/NF13_making_stats_evaluation.m similarity index 100% rename from nfgda_service/matlab/NF13_making_stats_evaluation.m rename to nfgda_service/algorithm/matlab/NF13_making_stats_evaluation.m diff --git a/nfgda_service/matlab/NF14_making_stats_scores_final.m b/nfgda_service/algorithm/matlab/NF14_making_stats_scores_final.m similarity index 100% rename from nfgda_service/matlab/NF14_making_stats_scores_final.m rename to nfgda_service/algorithm/matlab/NF14_making_stats_scores_final.m diff --git a/nfgda_service/matlab/NFFIG_v2.m b/nfgda_service/algorithm/matlab/NFFIG_v2.m similarity index 100% rename from nfgda_service/matlab/NFFIG_v2.m rename to nfgda_service/algorithm/matlab/NFFIG_v2.m diff --git a/nfgda_service/matlab/NF_make_new_fis.m b/nfgda_service/algorithm/matlab/NF_make_new_fis.m similarity index 100% rename from nfgda_service/matlab/NF_make_new_fis.m rename to nfgda_service/algorithm/matlab/NF_make_new_fis.m diff --git a/nfgda_service/matlab/NF_single_run.m b/nfgda_service/algorithm/matlab/NF_single_run.m similarity index 100% rename from nfgda_service/matlab/NF_single_run.m rename to nfgda_service/algorithm/matlab/NF_single_run.m diff --git a/nfgda_service/matlab/NF_tunefis.m b/nfgda_service/algorithm/matlab/NF_tunefis.m similarity index 100% rename from nfgda_service/matlab/NF_tunefis.m rename to nfgda_service/algorithm/matlab/NF_tunefis.m diff --git a/nfgda_service/matlab/boonlib.m b/nfgda_service/algorithm/matlab/boonlib.m similarity index 100% rename from nfgda_service/matlab/boonlib.m rename to nfgda_service/algorithm/matlab/boonlib.m diff --git a/nfgda_service/matlab/boonlib_original.m b/nfgda_service/algorithm/matlab/boonlib_original.m similarity index 100% rename from nfgda_service/matlab/boonlib_original.m rename to nfgda_service/algorithm/matlab/boonlib_original.m diff --git a/nfgda_service/matlab/cbfreeze.m b/nfgda_service/algorithm/matlab/cbfreeze.m similarity index 100% rename from nfgda_service/matlab/cbfreeze.m rename to nfgda_service/algorithm/matlab/cbfreeze.m diff --git a/nfgda_service/matlab/cbhandle.m b/nfgda_service/algorithm/matlab/cbhandle.m similarity index 100% rename from nfgda_service/matlab/cbhandle.m rename to nfgda_service/algorithm/matlab/cbhandle.m diff --git a/nfgda_service/matlab/cblabel.m b/nfgda_service/algorithm/matlab/cblabel.m similarity index 100% rename from nfgda_service/matlab/cblabel.m rename to nfgda_service/algorithm/matlab/cblabel.m diff --git a/nfgda_service/matlab/cbunits.m b/nfgda_service/algorithm/matlab/cbunits.m similarity index 100% rename from nfgda_service/matlab/cbunits.m rename to nfgda_service/algorithm/matlab/cbunits.m diff --git a/nfgda_service/matlab/cmapping.m b/nfgda_service/algorithm/matlab/cmapping.m similarity index 100% rename from nfgda_service/matlab/cmapping.m rename to nfgda_service/algorithm/matlab/cmapping.m diff --git a/nfgda_service/matlab/fis2python.m b/nfgda_service/algorithm/matlab/fis2python.m similarity index 100% rename from nfgda_service/matlab/fis2python.m rename to nfgda_service/algorithm/matlab/fis2python.m diff --git a/nfgda_service/matlab/freezeColors.m b/nfgda_service/algorithm/matlab/freezeColors.m similarity index 100% rename from nfgda_service/matlab/freezeColors.m rename to nfgda_service/algorithm/matlab/freezeColors.m diff --git a/nfgda_service/matlab/gen_tot_score.m b/nfgda_service/algorithm/matlab/gen_tot_score.m similarity index 100% rename from nfgda_service/matlab/gen_tot_score.m rename to nfgda_service/algorithm/matlab/gen_tot_score.m diff --git a/nfgda_service/matlab/ini2struct.m b/nfgda_service/algorithm/matlab/ini2struct.m similarity index 100% rename from nfgda_service/matlab/ini2struct.m rename to nfgda_service/algorithm/matlab/ini2struct.m diff --git a/nfgda_service/matlab/parseBoolean.m b/nfgda_service/algorithm/matlab/parseBoolean.m similarity index 100% rename from nfgda_service/matlab/parseBoolean.m rename to nfgda_service/algorithm/matlab/parseBoolean.m diff --git a/nfgda_service/matlab/performance_analysis/Deque.m b/nfgda_service/algorithm/matlab/performance_analysis/Deque.m similarity index 100% rename from nfgda_service/matlab/performance_analysis/Deque.m rename to nfgda_service/algorithm/matlab/performance_analysis/Deque.m diff --git a/nfgda_service/matlab/performance_analysis/arclength.m b/nfgda_service/algorithm/matlab/performance_analysis/arclength.m similarity index 100% rename from nfgda_service/matlab/performance_analysis/arclength.m rename to nfgda_service/algorithm/matlab/performance_analysis/arclength.m diff --git a/nfgda_service/matlab/performance_analysis/binary_search_rotation.m b/nfgda_service/algorithm/matlab/performance_analysis/binary_search_rotation.m similarity index 100% rename from nfgda_service/matlab/performance_analysis/binary_search_rotation.m rename to nfgda_service/algorithm/matlab/performance_analysis/binary_search_rotation.m diff --git a/nfgda_service/matlab/performance_analysis/calculate_metrics.m b/nfgda_service/algorithm/matlab/performance_analysis/calculate_metrics.m similarity index 100% rename from nfgda_service/matlab/performance_analysis/calculate_metrics.m rename to nfgda_service/algorithm/matlab/performance_analysis/calculate_metrics.m diff --git a/nfgda_service/matlab/performance_analysis/cc2groups.m b/nfgda_service/algorithm/matlab/performance_analysis/cc2groups.m similarity index 100% rename from nfgda_service/matlab/performance_analysis/cc2groups.m rename to nfgda_service/algorithm/matlab/performance_analysis/cc2groups.m diff --git a/nfgda_service/matlab/performance_analysis/get_rotation_matrix.m b/nfgda_service/algorithm/matlab/performance_analysis/get_rotation_matrix.m similarity index 100% rename from nfgda_service/matlab/performance_analysis/get_rotation_matrix.m rename to nfgda_service/algorithm/matlab/performance_analysis/get_rotation_matrix.m diff --git a/nfgda_service/matlab/performance_analysis/polyfit_rotation.m b/nfgda_service/algorithm/matlab/performance_analysis/polyfit_rotation.m similarity index 100% rename from nfgda_service/matlab/performance_analysis/polyfit_rotation.m rename to nfgda_service/algorithm/matlab/performance_analysis/polyfit_rotation.m diff --git a/nfgda_service/matlab/performance_analysis/polyval_rotation.m b/nfgda_service/algorithm/matlab/performance_analysis/polyval_rotation.m similarity index 100% rename from nfgda_service/matlab/performance_analysis/polyval_rotation.m rename to nfgda_service/algorithm/matlab/performance_analysis/polyval_rotation.m diff --git a/nfgda_service/matlab/performance_analysis/prepare_gf_lines.m b/nfgda_service/algorithm/matlab/performance_analysis/prepare_gf_lines.m similarity index 100% rename from nfgda_service/matlab/performance_analysis/prepare_gf_lines.m rename to nfgda_service/algorithm/matlab/performance_analysis/prepare_gf_lines.m diff --git a/nfgda_service/matlab/plotInputMFs.m b/nfgda_service/algorithm/matlab/plotInputMFs.m similarity index 100% rename from nfgda_service/matlab/plotInputMFs.m rename to nfgda_service/algorithm/matlab/plotInputMFs.m diff --git a/nfgda_service/matlab/rot_score_back.m b/nfgda_service/algorithm/matlab/rot_score_back.m similarity index 100% rename from nfgda_service/matlab/rot_score_back.m rename to nfgda_service/algorithm/matlab/rot_score_back.m diff --git a/nfgda_service/matlab/tight_subplot.m b/nfgda_service/algorithm/matlab/tight_subplot.m similarity index 100% rename from nfgda_service/matlab/tight_subplot.m rename to nfgda_service/algorithm/matlab/tight_subplot.m diff --git a/nfgda_service/matlab/tmpCELLdatax.mat b/nfgda_service/algorithm/matlab/tmpCELLdatax.mat similarity index 100% rename from nfgda_service/matlab/tmpCELLdatax.mat rename to nfgda_service/algorithm/matlab/tmpCELLdatax.mat diff --git a/nfgda_service/matlab/tmpCELLdatax2.mat b/nfgda_service/algorithm/matlab/tmpCELLdatax2.mat similarity index 100% rename from nfgda_service/matlab/tmpCELLdatax2.mat rename to nfgda_service/algorithm/matlab/tmpCELLdatax2.mat diff --git a/nfgda_service/matlab/tmpCELLdatay.mat b/nfgda_service/algorithm/matlab/tmpCELLdatay.mat similarity index 100% rename from nfgda_service/matlab/tmpCELLdatay.mat rename to nfgda_service/algorithm/matlab/tmpCELLdatay.mat diff --git a/nfgda_service/matlab/tmpCELLdatay2.mat b/nfgda_service/algorithm/matlab/tmpCELLdatay2.mat similarity index 100% rename from nfgda_service/matlab/tmpCELLdatay2.mat rename to nfgda_service/algorithm/matlab/tmpCELLdatay2.mat diff --git a/nfgda_service/matlab/tracking_points/blob_cluster.m b/nfgda_service/algorithm/matlab/tracking_points/blob_cluster.m similarity index 100% rename from nfgda_service/matlab/tracking_points/blob_cluster.m rename to nfgda_service/algorithm/matlab/tracking_points/blob_cluster.m diff --git a/nfgda_service/matlab/tracking_points/calc_small_angle_diff.m b/nfgda_service/algorithm/matlab/tracking_points/calc_small_angle_diff.m similarity index 100% rename from nfgda_service/matlab/tracking_points/calc_small_angle_diff.m rename to nfgda_service/algorithm/matlab/tracking_points/calc_small_angle_diff.m diff --git a/nfgda_service/matlab/tracking_points/cluster_points.m b/nfgda_service/algorithm/matlab/tracking_points/cluster_points.m similarity index 100% rename from nfgda_service/matlab/tracking_points/cluster_points.m rename to nfgda_service/algorithm/matlab/tracking_points/cluster_points.m diff --git a/nfgda_service/matlab/tracking_points/cure_tracks.m b/nfgda_service/algorithm/matlab/tracking_points/cure_tracks.m similarity index 100% rename from nfgda_service/matlab/tracking_points/cure_tracks.m rename to nfgda_service/algorithm/matlab/tracking_points/cure_tracks.m diff --git a/nfgda_service/matlab/tracking_points/forecast.m b/nfgda_service/algorithm/matlab/tracking_points/forecast.m similarity index 100% rename from nfgda_service/matlab/tracking_points/forecast.m rename to nfgda_service/algorithm/matlab/tracking_points/forecast.m diff --git a/nfgda_service/matlab/tracking_points/get_time_hour_UTC.m b/nfgda_service/algorithm/matlab/tracking_points/get_time_hour_UTC.m similarity index 100% rename from nfgda_service/matlab/tracking_points/get_time_hour_UTC.m rename to nfgda_service/algorithm/matlab/tracking_points/get_time_hour_UTC.m diff --git a/nfgda_service/matlab/tracking_points/initialize_tracks.m b/nfgda_service/algorithm/matlab/tracking_points/initialize_tracks.m similarity index 100% rename from nfgda_service/matlab/tracking_points/initialize_tracks.m rename to nfgda_service/algorithm/matlab/tracking_points/initialize_tracks.m diff --git a/nfgda_service/matlab/tracking_points/intialize_gf_event.m b/nfgda_service/algorithm/matlab/tracking_points/intialize_gf_event.m similarity index 100% rename from nfgda_service/matlab/tracking_points/intialize_gf_event.m rename to nfgda_service/algorithm/matlab/tracking_points/intialize_gf_event.m diff --git a/nfgda_service/matlab/tracking_points/intialize_points.m b/nfgda_service/algorithm/matlab/tracking_points/intialize_points.m similarity index 100% rename from nfgda_service/matlab/tracking_points/intialize_points.m rename to nfgda_service/algorithm/matlab/tracking_points/intialize_points.m diff --git a/nfgda_service/matlab/tracking_points/map_points_to_2d_grid.m b/nfgda_service/algorithm/matlab/tracking_points/map_points_to_2d_grid.m similarity index 100% rename from nfgda_service/matlab/tracking_points/map_points_to_2d_grid.m rename to nfgda_service/algorithm/matlab/tracking_points/map_points_to_2d_grid.m diff --git a/nfgda_service/matlab/tracking_points/one_to_one_point_correspondence.m b/nfgda_service/algorithm/matlab/tracking_points/one_to_one_point_correspondence.m similarity index 100% rename from nfgda_service/matlab/tracking_points/one_to_one_point_correspondence.m rename to nfgda_service/algorithm/matlab/tracking_points/one_to_one_point_correspondence.m diff --git a/nfgda_service/matlab/tracking_points/plot_tracks.m b/nfgda_service/algorithm/matlab/tracking_points/plot_tracks.m similarity index 100% rename from nfgda_service/matlab/tracking_points/plot_tracks.m rename to nfgda_service/algorithm/matlab/tracking_points/plot_tracks.m diff --git a/nfgda_service/matlab/tracking_points/track_main.m b/nfgda_service/algorithm/matlab/tracking_points/track_main.m similarity index 100% rename from nfgda_service/matlab/tracking_points/track_main.m rename to nfgda_service/algorithm/matlab/tracking_points/track_main.m diff --git a/nfgda_service/nfgda/Celldp.npy b/nfgda_service/algorithm/nfgda/Celldp.npy similarity index 100% rename from nfgda_service/nfgda/Celldp.npy rename to nfgda_service/algorithm/nfgda/Celldp.npy diff --git a/nfgda_service/nfgda/Celldpw.npy b/nfgda_service/algorithm/nfgda/Celldpw.npy similarity index 100% rename from nfgda_service/nfgda/Celldpw.npy rename to nfgda_service/algorithm/nfgda/Celldpw.npy diff --git a/nfgda_service/nfgda/NF00ref_YHWANG_fis4python.mat b/nfgda_service/algorithm/nfgda/NF00ref_YHWANG_fis4python.mat similarity index 100% rename from nfgda_service/nfgda/NF00ref_YHWANG_fis4python.mat rename to nfgda_service/algorithm/nfgda/NF00ref_YHWANG_fis4python.mat diff --git a/nfgda_service/nfgda/NFGDA_load_config.py b/nfgda_service/algorithm/nfgda/NFGDA_load_config.py similarity index 100% rename from nfgda_service/nfgda/NFGDA_load_config.py rename to nfgda_service/algorithm/nfgda/NFGDA_load_config.py diff --git a/nfgda_service/nfgda/NF_Lib.py b/nfgda_service/algorithm/nfgda/NF_Lib.py similarity index 100% rename from nfgda_service/nfgda/NF_Lib.py rename to nfgda_service/algorithm/nfgda/NF_Lib.py diff --git a/nfgda_service/nfgda/__init__.py b/nfgda_service/algorithm/nfgda/__init__.py similarity index 100% rename from nfgda_service/nfgda/__init__.py rename to nfgda_service/algorithm/nfgda/__init__.py diff --git a/nfgda_service/nfgda/colorlevel.py b/nfgda_service/algorithm/nfgda/colorlevel.py similarity index 100% rename from nfgda_service/nfgda/colorlevel.py rename to nfgda_service/algorithm/nfgda/colorlevel.py diff --git a/nfgda_service/nfgda/math_kit.py b/nfgda_service/algorithm/nfgda/math_kit.py similarity index 100% rename from nfgda_service/nfgda/math_kit.py rename to nfgda_service/algorithm/nfgda/math_kit.py diff --git a/nfgda_service/nfgda/nf_path.py b/nfgda_service/algorithm/nfgda/nf_path.py similarity index 100% rename from nfgda_service/nfgda/nf_path.py rename to nfgda_service/algorithm/nfgda/nf_path.py diff --git a/nfgda_service/python/Bulk_Processing.py b/nfgda_service/algorithm/python/Bulk_Processing.py similarity index 100% rename from nfgda_service/python/Bulk_Processing.py rename to nfgda_service/algorithm/python/Bulk_Processing.py diff --git a/nfgda_service/python/FTC_imporve.ipynb b/nfgda_service/algorithm/python/FTC_imporve.ipynb similarity index 100% rename from nfgda_service/python/FTC_imporve.ipynb rename to nfgda_service/algorithm/python/FTC_imporve.ipynb diff --git a/nfgda_service/python/Krita_Fig.py b/nfgda_service/algorithm/python/Krita_Fig.py similarity index 100% rename from nfgda_service/python/Krita_Fig.py rename to nfgda_service/algorithm/python/Krita_Fig.py diff --git a/nfgda_service/python/Krita_mask_converter.py b/nfgda_service/algorithm/python/Krita_mask_converter.py similarity index 100% rename from nfgda_service/python/Krita_mask_converter.py rename to nfgda_service/algorithm/python/Krita_mask_converter.py diff --git a/nfgda_service/python/Krita_notes.txt b/nfgda_service/algorithm/python/Krita_notes.txt similarity index 100% rename from nfgda_service/python/Krita_notes.txt rename to nfgda_service/algorithm/python/Krita_notes.txt diff --git a/nfgda_service/python/NF01_convert_V06_to_mat.py b/nfgda_service/algorithm/python/NF01_convert_V06_to_mat.py similarity index 100% rename from nfgda_service/python/NF01_convert_V06_to_mat.py rename to nfgda_service/algorithm/python/NF01_convert_V06_to_mat.py diff --git a/nfgda_service/python/NFFig.ipynb b/nfgda_service/algorithm/python/NFFig.ipynb similarity index 100% rename from nfgda_service/python/NFFig.ipynb rename to nfgda_service/algorithm/python/NFFig.ipynb diff --git a/nfgda_service/python/NFFig.py b/nfgda_service/algorithm/python/NFFig.py similarity index 100% rename from nfgda_service/python/NFFig.py rename to nfgda_service/algorithm/python/NFFig.py diff --git a/nfgda_service/python/NFGDA.py b/nfgda_service/algorithm/python/NFGDA.py similarity index 100% rename from nfgda_service/python/NFGDA.py rename to nfgda_service/algorithm/python/NFGDA.py diff --git a/nfgda_service/python/NF_cross_section.py b/nfgda_service/algorithm/python/NF_cross_section.py similarity index 100% rename from nfgda_service/python/NF_cross_section.py rename to nfgda_service/algorithm/python/NF_cross_section.py diff --git a/nfgda_service/python/NF_forecast_operation.py b/nfgda_service/algorithm/python/NF_forecast_operation.py similarity index 100% rename from nfgda_service/python/NF_forecast_operation.py rename to nfgda_service/algorithm/python/NF_forecast_operation.py diff --git a/nfgda_service/python/NF_stochastic_forecast.py b/nfgda_service/algorithm/python/NF_stochastic_forecast.py similarity index 100% rename from nfgda_service/python/NF_stochastic_forecast.py rename to nfgda_service/algorithm/python/NF_stochastic_forecast.py diff --git a/nfgda_service/python/NF_trainset.py b/nfgda_service/algorithm/python/NF_trainset.py similarity index 100% rename from nfgda_service/python/NF_trainset.py rename to nfgda_service/algorithm/python/NF_trainset.py diff --git a/nfgda_service/python/download_nexrad_l2_data.py b/nfgda_service/algorithm/python/download_nexrad_l2_data.py similarity index 100% rename from nfgda_service/python/download_nexrad_l2_data.py rename to nfgda_service/algorithm/python/download_nexrad_l2_data.py diff --git a/nfgda_service/python/improveFAR.ipynb b/nfgda_service/algorithm/python/improveFAR.ipynb similarity index 100% rename from nfgda_service/python/improveFAR.ipynb rename to nfgda_service/algorithm/python/improveFAR.ipynb diff --git a/nfgda_service/python/set_path.sh b/nfgda_service/algorithm/python/set_path.sh similarity index 100% rename from nfgda_service/python/set_path.sh rename to nfgda_service/algorithm/python/set_path.sh diff --git a/nfgda_service/scripts/NFGDA.ini b/nfgda_service/algorithm/scripts/NFGDA.ini similarity index 100% rename from nfgda_service/scripts/NFGDA.ini rename to nfgda_service/algorithm/scripts/NFGDA.ini diff --git a/nfgda_service/scripts/NFGDA_Host.py b/nfgda_service/algorithm/scripts/NFGDA_Host.py similarity index 98% rename from nfgda_service/scripts/NFGDA_Host.py rename to nfgda_service/algorithm/scripts/NFGDA_Host.py index d27745d..ca26f84 100644 --- a/nfgda_service/scripts/NFGDA_Host.py +++ b/nfgda_service/algorithm/scripts/NFGDA_Host.py @@ -296,11 +296,11 @@ async def shutdown(self): # await asyncio.gather(*self._tasks, return_exceptions=True), - logger.info("shutting down process pools (wait=False)") - self.dl_pool.shutdown(wait=False) - self.ng_pool.shutdown(wait=False) - self.df_pool.shutdown(wait=False) - self.sf_pool.shutdown(wait=False) + logger.info("shutting down process pools") + self.dl_pool.shutdown(wait=True, cancel_futures=True) + self.ng_pool.shutdown(wait=True, cancel_futures=True) + self.df_pool.shutdown(wait=True, cancel_futures=True) + self.sf_pool.shutdown(wait=True, cancel_futures=True) logger.info("shutdown complete") async def delay_shutdown(self, timeout=3600): diff --git a/nfgda_service/main.py b/nfgda_service/main.py index 6c1f1b5..ce070a8 100644 --- a/nfgda_service/main.py +++ b/nfgda_service/main.py @@ -1,7 +1,9 @@ import os +import shutil import asyncio import redis import logging +from datetime import datetime, timezone from nfgda_service import NfgdaService from process_output import generate_geotiff_output @@ -11,19 +13,51 @@ ) logger = logging.getLogger(__name__) -MAX_CONCURRENT_JOBS = int(os.getenv("MAX_CONCURRENT_JOBS", "1")) - redis_client = redis.Redis(host=os.getenv("REDIS_HOST"), port=int(os.getenv("REDIS_PORT", "6379")), db=int(os.getenv("REDIS_DB", "0")), decode_responses=True) # semaphor manages how many jobs can run at once -job_semaphore = asyncio.Semaphore(MAX_CONCURRENT_JOBS) +job_semaphore = asyncio.Semaphore(int(os.getenv("MAX_CONCURRENT_JOBS", "2"))) + +async def listen_for_jobs() -> None: + """Poll Redis for jobs and dispatch them as async tasks. + + Only dequeues a job when the semaphore has capacity, so jobs + remain in job_queue and their queue position stays accurate. + Expired-job cleanup runs between poll cycles. + """ + logger.info("NFGDA service started") + logger.info("listening for jobs (max %d concurrent)", int(os.getenv("MAX_CONCURRENT_JOBS", "2"))) + + loop = asyncio.get_running_loop() + + while True: + # periodic cleanup of expired job assets + await loop.run_in_executor(None, cleanup_expired_jobs) + + # wait until there's capacity to process a job + await job_semaphore.acquire() + + # dequeue with a timeout, run cleanup when idle + result = await loop.run_in_executor( + None, lambda: redis_client.brpop("job_queue", timeout=10) + ) + + if result is None: + job_semaphore.release() + continue + + _, job_id = result + logger.info("dequeued job %s", job_id) + + # run that job and release the semaphore when done + asyncio.create_task(run_and_release_job(job_id)) async def process_job(job_id: str) -> None: """Process a single job after being acquired from the queue.""" + job_key = f"job:{job_id}" job_fields = redis_client.hgetall(job_key) out_dir = format_output_directory(job_id) - upate_redis_with_output_dir(job_id, out_dir) logger.info("processing job %s", job_id) service = NfgdaService(redis_client, job_id, job_fields, out_dir) @@ -32,17 +66,18 @@ async def process_job(job_id: str) -> None: def process_geotiff_output(job_id: str) -> None: """Process the output of the NFGDA algorithm for a given job into a stack of GeoTIFFs for final display on the frontend.""" + if redis_client.hget(f"job:{job_id}", "status") == "FAILED": return - logger.info("Generating GeoTIFF series for job %s", job_id) + logger.info("generating GeoTIFF series for job %s", job_id) result = generate_geotiff_output(job_id, redis_client) if result is not None: - logger.error("Failed to generate GeoTIFF series for job %s. Error message: %s", job_id, result) + logger.error("failed to generate GeoTIFF series for job %s. Error message: %s", job_id, result) redis_client.hset(f"job:{job_id}", mapping={"status": "FAILED", "error_message": result}) else: - logger.info("Successfully generated GeoTIFF series for job %s", job_id) + logger.info("successfully generated GeoTIFF series for job %s", job_id) redis_client.hset(f"job:{job_id}", mapping={"status": "COMPLETED", "num_frames": len(os.listdir(f"/processed_data/{job_id}"))}) async def run_and_release_job(job_id: str) -> None: @@ -54,38 +89,48 @@ async def run_and_release_job(job_id: str) -> None: # they took my jerb! job_semaphore.release() -def upate_redis_with_output_dir(job_id: str, out_dir: str) -> None: - """Update Redis with the output directory for a job.""" - redis_client.hset(f"job:{job_id}", "outputDir", out_dir) - def format_output_directory(job_id: str) -> str: """Format the output directory for a job.""" out_dir = f"/nfgda_output/{job_id}/" os.makedirs(out_dir, exist_ok=True) return out_dir +def cleanup_expired_jobs() -> None: + """Scan Redis for job records whose asset_expiry_timestamp has passed. -async def listen_for_jobs() -> None: - """Poll Redis for jobs and dispatch them as async tasks. - - Only dequeues a job when the semaphore has capacity, so jobs - remain in job_queue and their queue position stays accurate. + For each expired job: + 1. Delete its output directories (/nfgda_output/ and /processed_data/). + 2. Remove the job hash from Redis. """ - logger.info("NFGDA service started") - logger.info("listening for jobs (max %d concurrent)", MAX_CONCURRENT_JOBS) - - loop = asyncio.get_running_loop() - - while True: - # Wait until there's capacity to process a job - await job_semaphore.acquire() - - #then dequeue job details from redis - _, job_id = await loop.run_in_executor(None, lambda: redis_client.brpop("job_queue", timeout=0)) - logger.info("dequeued job %s", job_id) - - # run that job and release the semaphore when done - asyncio.create_task(run_and_release_job(job_id)) + now = datetime.now(timezone.utc) + + for key in redis_client.scan_iter(match="job:*"): + expiry_str = redis_client.hget(key, "asset_expiry_timestamp") + if not expiry_str: + continue + + try: + expiry = datetime.strptime(expiry_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) + except ValueError: + logger.warning("skipping %s due to malformed expiry timestamp: %s", key, expiry_str) + continue + + if now < expiry: + continue + + # extract job_id from the key ("job:") + job_id = key.split(":", 1)[1] + logger.info("cleaning up expired job %s (expired %s)", job_id, expiry_str) + + # remove output directories + for base in ("/nfgda_output", "/processed_data"): + path = os.path.join(base, job_id) + if os.path.isdir(path): + shutil.rmtree(path) + logger.info("- removed %s", path) + + # remove the job record from redis + redis_client.delete(key) def main(): asyncio.run(listen_for_jobs()) diff --git a/nfgda_service/nfgda_runner.py b/nfgda_service/nfgda_runner.py index 1e2232f..c06efee 100644 --- a/nfgda_service/nfgda_runner.py +++ b/nfgda_service/nfgda_runner.py @@ -6,14 +6,12 @@ logger = logging.getLogger(__name__) -MAX_NO_DATA_POLLS = int(os.getenv("MAX_NO_DATA_POLLS", "10")) - class NfgdaRunner: - """Executes the NFGDA algorithm for a given run request.""" + """ executes the NFGDA algorithm for a given run request. """ def __init__(self, station_id: str, start_utc: str, end_utc: str, job_id: str, out_dir: str) -> None: """ - Initialize the NfgdaRunner with the given parameters. + initialize the NfgdaRunner with the given parameters. Args: station_id (str): The station code. @@ -42,42 +40,47 @@ async def run(self): Returns: bool: True if the NFGDA process completed successfully, False otherwise. """ - + + # get the number of consecutive no data polls to allow before killing the process + no_data_polls = int(os.getenv("MAX_NO_DATA_POLLS", "10")) + logger.info(f"timebox parameters set to start_utc: {self.start_utc}, end_utc: {self.end_utc}") + # create a temporary config file for the algorithm config_path = self.create_temp_config(self.out_dir) logger.info("setting environment variable NFGDA_CONFIG_PATH to %s", config_path) + # if the config file couldn't be created for some reason return False if config_path is None: return False, "Failed to create config file" logger.info("running algorithm for job %s", self.job_id) - state = {"no_data_count": 0, "fatal_error_count": 0} + state = {"no_data_count": 0, "fatal_error_count": 0, "clean_shutdown": False} try: - # Build an env dict with the per-job config path + # build an env dict with the per-job config path env = os.environ.copy() env["NFGDA_CONFIG_PATH"] = config_path # asyncio manages the wait from the spawned algorithm subprocess(es) proc = await asyncio.create_subprocess_exec( - "python", "-u", "/app/scripts/NFGDA_Host.py", + "python", "-u", "/app/algorithm/scripts/NFGDA_Host.py", env=env, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - # Stream stdout and stderr from algorithm subprocesses line-by-line in real time + # stream stdout and stderr from algorithm subprocesses line-by-line in real time stream_tasks = [ asyncio.create_task(self.stream_pipe(proc.stdout, "stdout")), asyncio.create_task(self.monitored_stream(proc.stderr, "stderr", proc, state)), ] - # Wait for the algorithm to complete, with a timeout + # wait for the algorithm to complete, with a timeout try: await asyncio.wait_for(proc.wait(), timeout=self.algo_timeout_seconds) except asyncio.TimeoutError: - logger.error("NFGDA algorithm timed out — killing process") + logger.error("NFGDA algorithm timed out. Killing process") proc.kill() await proc.wait() return False, "NFGDA algorithm timed out" @@ -85,23 +88,33 @@ async def run(self): # flush remaining buffered output await asyncio.gather(*stream_tasks) - # Check if the algorithm was killed due to a data gap - if state["no_data_count"] >= MAX_NO_DATA_POLLS: + # check if the algorithm was killed due to a lack of data in the nexrad S3 bucket + if state["no_data_count"] >= no_data_polls: logger.error( - "NFGDA process killed due to data gap — no scans found after %d consecutive polls", - MAX_NO_DATA_POLLS, + "NFGDA process killed due to data gap. No scans found after %d consecutive polls", + no_data_polls, ) - return False, f"No radar data found after {MAX_NO_DATA_POLLS} polls" + return False, f"No radar data found after {no_data_polls} polls" - # Check if the algorithm exited with a non-zero return code + # check if the algorithm exited with a non-zero return code if proc.returncode != 0: - logger.error( - "NFGDA algorithm exited with code %d", - proc.returncode, - ) - return False, f"an error occurred processing the algorithm. Error code: {proc.returncode}" - - # Check if fatal errors were logged during processing + if state["clean_shutdown"] and state["fatal_error_count"] == 0: + # the algorithm completed its work but the process exited + # non-zero due to a benign Python shutdown error (e.g. the + # ProcessPoolExecutor "Bad file descriptor" race condition). + # treat this as a success so the pipeline can continue. + logger.warning( + "NFGDA algorithm exited with code %d but reported a clean shutdown — treating as success", + proc.returncode, + ) + else: + logger.error( + "NFGDA algorithm exited with code %d", + proc.returncode, + ) + return False, f"an error occurred processing the algorithm. Error code: {proc.returncode}" + + # check if fatal errors were logged during processing if state["fatal_error_count"] > 0: logger.error( "NFGDA algorithm reported %d fatal error(s) during processing", @@ -118,16 +131,20 @@ async def run(self): @staticmethod def iso_to_csv_time(iso_str: str) -> str: - """Convert an ISO 8601 timestamp (e.g. '2024-07-07T01:22:24Z') to the + """ convert an ISO 8601 timestamp (e.g. '2024-07-07T01:22:24Z') to the comma-separated format that NFGDA config asks for (e.g. 'year,month,day,hour,minute,second'). + + Why tf did they format their timestamps like this??? """ - dt = datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) - return f"{dt.year},{dt.month},{dt.day},{dt.hour},{dt.minute},{dt.second}" + # convert the iso string to a datetime object + date_time = datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) + return f"{date_time.year},{date_time.month},{date_time.day},{date_time.hour},{date_time.minute},{date_time.second}" def create_temp_config(self, out_dir: str) -> str: - """Create a temporary NFGDA config file. Returns the path to the file.""" + """ create a temporary NFGDA config file. Returns the path to the file.""" - if not os.path.exists("/app/scripts/NFGDA.ini"): + # brief check against the config file shipped w/ the original algo for SnG + if not os.path.exists("/app/algorithm/scripts/NFGDA.ini"): logger.warning("NFGDA.ini default config not found (proceeding anyway)") csv_start = self.iso_to_csv_time(self.start_utc) @@ -135,6 +152,7 @@ def create_temp_config(self, out_dir: str) -> str: logger.info("config times: start=%s -> %s, end=%s -> %s", self.start_utc, csv_start, self.end_utc, csv_end) + # write the config file out with tempfile.NamedTemporaryFile("w", delete=False, suffix=".ini", prefix="nfgda_") as f: f.write(f"""[Settings] radar_id = {self.station_id} @@ -165,13 +183,17 @@ async def stream_pipe(stream, label: str): @staticmethod async def monitored_stream(stream, label: str, proc, state: dict): - """Read lines and, for stderr, count consecutive no-data polls. + """ read logs, monitor for no data and fatal errors. Kill wonky processes Args: stream: asyncio subprocess stream (stdout or stderr). - MAX_NO_DATA_POLLS: Kill the process after this many consecutive + no_data_polls: Kill the process after this many consecutive "no new scans found" messages. """ + + no_data_polls = int(os.getenv("MAX_NO_DATA_POLLS", "10")) + + # read the log stream lines and check for patterns that indicate error while True: line = await stream.readline() if not line: @@ -180,11 +202,11 @@ async def monitored_stream(stream, label: str, proc, state: dict): logger.info("[NFGDA_Host %s] %s", label, text) if label == "stderr": - if "no new scans found" in text: + if "no new scans found" in text: state["no_data_count"] += 1 - if state["no_data_count"] >= MAX_NO_DATA_POLLS: + if state["no_data_count"] >= no_data_polls: logger.error( - "no data found after %d consecutive polls — killing process", + "no data found after %d consecutive polls. Killing process", state["no_data_count"], ) proc.kill() @@ -192,6 +214,9 @@ async def monitored_stream(stream, label: str, proc, state: dict): elif "new volume" in text: state["no_data_count"] = 0 + if "shutdown complete" in text: + state["clean_shutdown"] = True + if "fatal error" in text.lower(): state["fatal_error_count"] += 1 diff --git a/nfgda_service/nfgda_service.py b/nfgda_service/nfgda_service.py index 9ac604d..6f5cf5d 100644 --- a/nfgda_service/nfgda_service.py +++ b/nfgda_service/nfgda_service.py @@ -6,8 +6,7 @@ class NfgdaService: - """High-level service that orchestrates a single NFGDA run, including - job lifecycle updates in Redis.""" + """ high-level service that orchestrates a single NFGDA run, including job lifecycle updates in Redis. """ def __init__(self, redis_client, job_id: str, job_fields: dict, out_dir: str) -> None: self.redis_client = redis_client @@ -17,7 +16,7 @@ def __init__(self, redis_client, job_id: str, job_fields: dict, out_dir: str) -> self.out_dir = out_dir async def run(self) -> None: - """Execute the NFGDA algorithm and update job status in Redis.""" + """ execute the NFGDA algorithm and update job status in Redis. """ try: self.redis_client.hset(self.job_key, mapping={"status": "PROCESSING"}) @@ -34,13 +33,15 @@ async def run(self) -> None: ) success, message = await runner.run() + # update job status in redis if success: - logger.info("Algorithm processing for job %s completed successfully", self.job_id) + logger.info("algorithm processing for job %s completed successfully", self.job_id) else: + # no, this is patrick self.redis_client.hset(self.job_key, mapping={"status": "FAILED", "error_message": message}) - logger.warning("Job %s failed (runner returned falsy)", self.job_id) - logger.warning("Error message: %s", message) + logger.warning("job %s failed (runner returned falsy)", self.job_id) + logger.warning("error message: %s", message) except Exception as e: self.redis_client.hset(self.job_key, mapping={"status": "FAILED"}) - logger.exception("Job %s failed with exception: %s", self.job_id, e) + logger.exception("job %s failed with exception: %s", self.job_id, e) diff --git a/nfgda_service/process_output.py b/nfgda_service/process_output.py index bbd77c3..940e05b 100644 --- a/nfgda_service/process_output.py +++ b/nfgda_service/process_output.py @@ -252,13 +252,13 @@ def project_data(npz_path: str, radar_lat: float, radar_lon: float, out_dir: str ) # --------------------------- - # Write final GeoTIFF + # Write final Cloud-Optimized GeoTIFF # --------------------------- - driver = gdal.GetDriverByName("GTiff") + driver = gdal.GetDriverByName("COG") driver.CreateCopy( final_tif, warped_ds, - options=["COMPRESS=DEFLATE", "TILED=YES"] + options=["COMPRESS=DEFLATE", "OVERVIEWS=IGNORE_EXISTING"] ) warped_ds = None diff --git a/pyproject.toml b/pyproject.toml index ff63930..06c310c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,11 @@ [tool.ruff] line-length = 88 exclude = [ - "backend/src/nfgda_service/nfgda_algorithm", + "nfgda_service/algorithm", + "backend/src/nfgda_service", +] + +[tool.pytest.ini_options] +markers = [ + "slow: marks tests that poll for job completion (deselect with '-m \"not slow\"')", ] diff --git a/test_run_requests.py b/request_test_script.py similarity index 88% rename from test_run_requests.py rename to request_test_script.py index c511b4c..a721df1 100644 --- a/test_run_requests.py +++ b/request_test_script.py @@ -2,10 +2,10 @@ """ End-to-end integration test: -1. GET /APIs/stations → fetch the station list +1. GET /apis/stations → fetch the station list 2. Randomly select one station -3. POST /APIs/run → submit a job for that station -4. Poll GET /APIs/status until the job reaches COMPLETED (or FAILED) +3. POST /apis/run → submit a job for that station +4. Poll GET /apis/status until the job reaches COMPLETED (or FAILED) 5. GET /api/jobs//frames/ → fetch every produced frame """ @@ -29,8 +29,8 @@ def main(): # ── 1. Fetch station list ──────────────────────────────────────── print(f"\n{'='*60}") - print("Step 1: GET /APIs/stations") - resp = requests.get(f"{BASE_URL}/APIs/stations") + print("Step 1: GET /apis/stations") + resp = requests.get(f"{BASE_URL}/apis/stations") print(f" status: {resp.status_code}") if resp.status_code != 200: print(f" FAILED — {resp.text[:200]}") @@ -55,10 +55,10 @@ def main(): } print(f"\n{'='*60}") - print("Step 3: POST /APIs/run") + print("Step 3: POST /apis/run") print(f" payload: {json.dumps(payload, indent=2)}") - resp = requests.post(f"{BASE_URL}/APIs/run", json=payload) + resp = requests.post(f"{BASE_URL}/apis/run", json=payload) body = resp.json() print(f" status: {resp.status_code}") print(f" response: {json.dumps(body, indent=2)}") @@ -81,7 +81,7 @@ def main(): time.sleep(5) print(f"\n --- Poll #{poll} ---") - resp = requests.get(f"{BASE_URL}/APIs/status", params={"job_id": job_id}) + resp = requests.get(f"{BASE_URL}/apis/status", params={"job_id": job_id}) try: status_body = resp.json() except requests.exceptions.JSONDecodeError: @@ -108,7 +108,7 @@ def main(): print(f"Step 5: Fetching {num_frames} frame(s) for job {job_id}") for index in range(num_frames): - url = f"{BASE_URL}/APIs/jobs/{job_id}/frames/{index}" + url = f"{BASE_URL}/apis/jobs/{job_id}/frames/{index}" print(f"\n GET {url}") resp = requests.get(url) if resp.status_code == 200: diff --git a/scripts/projectRadarData.py b/scripts/projectRadarData.py index 0441ef5..0cbb7d3 100755 --- a/scripts/projectRadarData.py +++ b/scripts/projectRadarData.py @@ -3,7 +3,6 @@ import argparse import numpy as np from osgeo import gdal, osr -import math # --------------------------- # Command line inputs diff --git a/test_endpoints.py b/test_endpoints.py new file mode 100644 index 0000000..4335ac5 --- /dev/null +++ b/test_endpoints.py @@ -0,0 +1,237 @@ +""" +End-to-end integration tests for the backend API endpoints. + +Requires the full Docker Compose stack to be running: + docker compose up -d --build + +Run with: + pytest test_endpoints.py -v + +Test order matters — later tests depend on state created by earlier ones +(e.g. a job must be submitted before its status can be polled). The +pytest-ordering plugin or explicit state sharing via module-level variables +is used to enforce this. + +Endpoint coverage: + 1. GET /apis/stations → test_get_stations + 2. POST /apis/run → test_submit_job + 3. POST /apis/run (validation errors) → test_submit_job_* + 4. GET /apis/status?job_id= → test_poll_job_status + 5. GET /apis/jobs//frames/ → test_fetch_frames +""" + +import time +import pytest +import requests +from datetime import datetime, timedelta, timezone + +BASE_URL = "http://localhost:8001" +POLL_INTERVAL_SECONDS = 5 +MAX_POLLS = 120 + +# ── shared state across ordered tests ──────────────────────────────────────── +_state = { + "station_id": None, + "job_id": None, + "num_frames": 0, +} + + +def _fmt_utc(dt: datetime) -> str: + """Format a datetime as the ISO 8601 string the API expects.""" + return dt.strftime("%Y-%m-%dT%H:%M:%SZ") + + +# ───────────────────────────────────────────────────────────────────────────── +# 1. GET /apis/stations +# ───────────────────────────────────────────────────────────────────────────── +class TestStationsEndpoint: + """Tests for GET /apis/stations.""" + + def test_get_stations_returns_200(self): + resp = requests.get(f"{BASE_URL}/apis/stations") + assert resp.status_code == 200 + + def test_get_stations_returns_geojson(self): + resp = requests.get(f"{BASE_URL}/apis/stations") + body = resp.json() + assert "features" in body, "response should be a GeoJSON FeatureCollection" + assert isinstance(body["features"], list) + assert len(body["features"]) > 0, "station list should not be empty" + + def test_station_feature_has_expected_properties(self): + resp = requests.get(f"{BASE_URL}/apis/stations") + feature = resp.json()["features"][0] + assert "properties" in feature + assert "station_id" in feature["properties"] + assert "geometry" in feature + + # stash a station for use in later tests + _state["station_id"] = feature["properties"]["station_id"] + + +# ───────────────────────────────────────────────────────────────────────────── +# 2. POST /apis/run — validation error cases +# ───────────────────────────────────────────────────────────────────────────── +class TestRunEndpointValidation: + """Tests for POST /apis/run input validation (no job should be created).""" + + def test_missing_station_id_returns_400(self): + now = datetime.now(timezone.utc) + payload = { + "startUtc": _fmt_utc(now - timedelta(minutes=45)), + "endUtc": _fmt_utc(now - timedelta(minutes=25)), + } + resp = requests.post(f"{BASE_URL}/apis/run", json=payload) + assert resp.status_code == 400 + assert "stationId" in resp.json().get("error", "").lower() or "stationid" in resp.json().get("error", "").lower() + + def test_invalid_station_id_returns_400(self): + now = datetime.now(timezone.utc) + payload = { + "stationId": "ZZZZ_NOT_REAL", + "startUtc": _fmt_utc(now - timedelta(minutes=45)), + "endUtc": _fmt_utc(now - timedelta(minutes=25)), + } + resp = requests.post(f"{BASE_URL}/apis/run", json=payload) + assert resp.status_code == 400 + + def test_end_before_start_returns_400(self): + now = datetime.now(timezone.utc) + payload = { + "stationId": _state["station_id"] or "KABX", + "startUtc": _fmt_utc(now - timedelta(minutes=25)), + "endUtc": _fmt_utc(now - timedelta(minutes=45)), + } + resp = requests.post(f"{BASE_URL}/apis/run", json=payload) + assert resp.status_code == 400 + + def test_future_end_time_returns_400(self): + now = datetime.now(timezone.utc) + payload = { + "stationId": _state["station_id"] or "KABX", + "startUtc": _fmt_utc(now - timedelta(minutes=30)), + "endUtc": _fmt_utc(now + timedelta(hours=1)), + } + resp = requests.post(f"{BASE_URL}/apis/run", json=payload) + assert resp.status_code == 400 + + def test_end_time_too_recent_returns_400(self): + """endUtc within 5 minutes of now triggers live-polling mode in the algorithm — must be rejected.""" + now = datetime.now(timezone.utc) + payload = { + "stationId": _state["station_id"] or "KABX", + "startUtc": _fmt_utc(now - timedelta(minutes=30)), + "endUtc": _fmt_utc(now - timedelta(minutes=2)), + } + resp = requests.post(f"{BASE_URL}/apis/run", json=payload) + assert resp.status_code == 400 + + def test_duration_too_short_returns_400(self): + now = datetime.now(timezone.utc) + payload = { + "stationId": _state["station_id"] or "KABX", + "startUtc": _fmt_utc(now - timedelta(minutes=10)), + "endUtc": _fmt_utc(now - timedelta(minutes=8)), + } + resp = requests.post(f"{BASE_URL}/apis/run", json=payload) + assert resp.status_code == 400 + + +# ───────────────────────────────────────────────────────────────────────────── +# 3. POST /apis/run — happy path (submit a real job) +# ───────────────────────────────────────────────────────────────────────────── +class TestRunEndpointHappyPath: + """Submit a valid job and store the job_id for downstream tests.""" + + def test_submit_job_returns_202(self): + assert _state["station_id"], "station_id must be populated by TestStationsEndpoint" + + now = datetime.now(timezone.utc) + payload = { + "stationId": _state["station_id"], + "startUtc": _fmt_utc(now - timedelta(minutes=45)), + "endUtc": _fmt_utc(now - timedelta(minutes=25)), + } + resp = requests.post(f"{BASE_URL}/apis/run", json=payload) + assert resp.status_code == 202 + + body = resp.json() + assert "job_id" in body + _state["job_id"] = body["job_id"] + + +# ───────────────────────────────────────────────────────────────────────────── +# 4. GET /apis/status +# ───────────────────────────────────────────────────────────────────────────── +class TestStatusEndpoint: + """Tests for GET /apis/status.""" + + def test_missing_job_id_returns_400(self): + resp = requests.get(f"{BASE_URL}/apis/status") + assert resp.status_code == 400 + + def test_unknown_job_id_returns_404(self): + resp = requests.get(f"{BASE_URL}/apis/status", params={"job_id": "nonexistent-id"}) + assert resp.status_code == 404 + + def test_valid_job_returns_status(self): + assert _state["job_id"], "job_id must be populated by TestRunEndpointHappyPath" + resp = requests.get(f"{BASE_URL}/apis/status", params={"job_id": _state["job_id"]}) + assert resp.status_code == 200 + + body = resp.json() + assert "job_id" in body + assert "status" in body + assert body["status"] in {"PENDING", "PROCESSING", "COMPLETED", "FAILED"} + + @pytest.mark.slow + def test_poll_until_terminal(self): + """Poll the job until it reaches COMPLETED or FAILED (may take minutes).""" + assert _state["job_id"], "job_id must be populated by TestRunEndpointHappyPath" + + terminal = {"COMPLETED", "FAILED"} + for _ in range(MAX_POLLS): + time.sleep(POLL_INTERVAL_SECONDS) + resp = requests.get(f"{BASE_URL}/apis/status", params={"job_id": _state["job_id"]}) + body = resp.json() + if body.get("status") in terminal: + _state["num_frames"] = int(body.get("num_frames", 0)) + return # success — reached terminal state + + pytest.fail(f"Job {_state['job_id']} did not reach a terminal state after {MAX_POLLS} polls") + + +# ───────────────────────────────────────────────────────────────────────────── +# 5. GET /apis/jobs//frames/ +# ───────────────────────────────────────────────────────────────────────────── +class TestFramesEndpoint: + """Tests for GET /apis/jobs//frames/.""" + + def test_nonexistent_job_returns_404(self): + resp = requests.get(f"{BASE_URL}/apis/jobs/fake-job-id/frames/0") + assert resp.status_code == 404 + + @pytest.mark.slow + def test_fetch_all_frames(self): + """After job completion, every advertised frame should be retrievable.""" + if _state["num_frames"] == 0: + pytest.skip("No frames produced (job may have failed or not run yet)") + + job_id = _state["job_id"] + for i in range(_state["num_frames"]): + resp = requests.get(f"{BASE_URL}/apis/jobs/{job_id}/frames/{i}") + assert resp.status_code == 200, f"frame {i} returned {resp.status_code}" + assert len(resp.content) > 0, f"frame {i} was empty" + assert resp.headers.get("Content-Type") == "image/tiff" + + @pytest.mark.slow + def test_out_of_range_frame_returns_404(self): + """Requesting a frame index beyond what the job produced should 404.""" + if _state["num_frames"] == 0: + pytest.skip("No frames produced (job may have failed or not run yet)") + + resp = requests.get( + f"{BASE_URL}/apis/jobs/{_state['job_id']}/frames/{_state['num_frames'] + 100}" + ) + assert resp.status_code == 404