diff --git a/dags/miovision_hardware.py b/dags/miovision_hardware.py new file mode 100644 index 000000000..54816d75f --- /dev/null +++ b/dags/miovision_hardware.py @@ -0,0 +1,62 @@ +import sys +import os +import pendulum +from datetime import timedelta + +from airflow.decorators import dag, task +from airflow.models import Variable +from airflow.providers.postgres.hooks.postgres import PostgresHook + +try: + repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) + sys.path.insert(0, repo_path) + from dags.dag_functions import task_fail_slack_alert, get_readme_docmd + from volumes.miovision.api.configuration_info import ( + get_cameras, get_configuration_dates + ) +except: + raise ImportError("Cannot import DAG helper functions.") + +DAG_NAME = 'miovision_hardware' +DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) + +README_PATH = os.path.join(repo_path, 'volumes/miovision/api/readme.md') +DOC_MD = get_readme_docmd(README_PATH, DAG_NAME) + +default_args = { + 'owner': ','.join(DAG_OWNERS), + 'depends_on_past': False, + 'start_date': pendulum.datetime(2024, 12, 5, tz="America/Toronto"), + 'email_on_failure': False, + 'email_on_success': False, + 'retries': 0, + 'retry_delay': timedelta(minutes=5), + 'on_failure_callback': task_fail_slack_alert +} + +@dag( + dag_id=DAG_NAME, + default_args=default_args, + schedule='0 2 * * *', + catchup=False, + tags=["miovision", "data_pull"], + doc_md=DOC_MD +) +def pull_miovision_dag(): + + @task(retries = 1) + def pull_config_dates(): + mio_postgres = PostgresHook("miovision_api_bot") + with mio_postgres.get_conn() as conn: + get_configuration_dates(conn) + + @task(retries = 1) + def pull_camera_details(): + mio_postgres = PostgresHook("miovision_api_bot") + with mio_postgres.get_conn() as conn: + get_cameras(conn) + + pull_config_dates() + pull_camera_details() + +pull_miovision_dag() \ No newline at end of file diff --git a/volumes/miovision/api/configuration_info.py b/volumes/miovision/api/configuration_info.py new file mode 100644 index 000000000..e7e513d4b --- /dev/null +++ b/volumes/miovision/api/configuration_info.py @@ -0,0 +1,98 @@ +'''Script that can be run manually to update `miovision_api.camera_details`. + + +''' +import pytz +import os +from requests import Session +import pandas as pd +from psycopg2 import sql +from psycopg2.extras import execute_values +from datetime import datetime +import logging + +from airflow.hooks.base_hook import BaseHook + +from .intersection_tmc import get_intersection_info + +LOGGER = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +SQL_DIR = os.path.join(os.path.dirname(os.path.abspath(os.path.dirname(__file__))), 'sql') + +session = Session() +session.proxies = {} + +def headers(): + '''get api key from airflow variable.''' + api_key = BaseHook.get_connection('miovision_api_key') + headers = { + 'Content-Type': 'application/json', + 'apikey': api_key.extra_dejson['key'] + } + return headers + +URL_BASE = "https://api.miovision.one/api/v1" + +def get_cameras(conn): + intersections = get_intersection_info(conn) + cameras = pd.DataFrame() + HEADERS=headers() + #for each intersection, query it's camera details + for intersection in intersections: + response = session.get( + URL_BASE + f"/intersections/{intersection.id1}/cameras", + params={}, + headers=HEADERS, + proxies=session.proxies + ) + if response.status_code == 200: + cameras_i = pd.DataFrame(response.json()['cameras']) + if cameras_i.empty: + continue + cameras_i = cameras_i[['id', 'label']] + cameras_i = cameras_i.add_prefix('camera_') + cameras_i['intersection_id'] = intersection.id1 + cameras = pd.concat([cameras, cameras_i]) + else: + #don't need to fail this non-critical pipeline + LOGGER.info(f"Intersection {intersection.id1} recieved {response.status_code} error: {response.reason}") + + final = [tuple(x) for x in cameras.to_numpy()] #convert to tuples for inserting + + fpath = os.path.join(SQL_DIR, 'inserts/insert-camera_details.sql') + with open(fpath, 'r', encoding='utf-8') as file: + insert_query = sql.SQL(file.read()) + + # Get intersections currently stored in `miovision_api` on Postgres. + with conn.cursor() as cur: + execute_values(cur, insert_query, final) + +def get_configuration_dates(conn): + intersections = get_intersection_info(conn) + HEADERS=headers() + configs = [] + for intersection in intersections: + response = session.get( + URL_BASE + f"/intersections/{intersection.id1}/hardware/detectionConfiguration", + params={}, + headers=HEADERS, + proxies=session.proxies + ) + if response.status_code == 200: + if response.json()['lastUpdated'] is not None: + config_i = ( + intersection.uid, + datetime.fromtimestamp( + response.json()['lastUpdated']/1000, + tz=pytz.timezone('America/Toronto') + ) + ) + configs.append(config_i) + else: + #don't need to fail this non-critical pipeline + LOGGER.info(f"Intersection {intersection.id1} recieved {response.status_code} error: {response.reason}") + sql='''INSERT INTO miovision_api.configuration_updates (intersection_uid, updated_time) VALUES %s + ON CONFLICT (intersection_uid, updated_time) DO NOTHING''' + with conn.cursor() as cur: + execute_values(cur, sql, configs) \ No newline at end of file diff --git a/volumes/miovision/api/readme.md b/volumes/miovision/api/readme.md index 6d106b249..73d813353 100644 --- a/volumes/miovision/api/readme.md +++ b/volumes/miovision/api/readme.md @@ -1,30 +1,32 @@ - [Overview](#overview) - - [Relevant Calls and Outputs](#relevant-calls-and-outputs) - - [Turning Movement Count (TMC)](#turning-movement-count-tmc) - - [Turning Movement Count (TMC) Crosswalks](#turning-movement-count-tmc-crosswalks) - - [Error responses](#error-responses) - - [Input Files](#input-files) - - [How to run the api](#how-to-run-the-api) - - [TMCs (Volumes)](#tmcs-volumes) - - [Alerts](#alerts) - - [Classifications](#classifications) - - [API Classifications](#api-classifications) - - [Old Classifications (csv dumps and datalink)](#old-classifications-csv-dumps-and-datalink) - - [PostgreSQL Functions](#postgresql-functions) - - [Invalid Movements](#invalid-movements) - - [How the API works](#how-the-api-works) - - [Repulling data](#repulling-data) + - [Relevant Calls and Outputs](#relevant-calls-and-outputs) + - [Turning Movement Count TMC](#turning-movement-count-tmc) + - [Turning Movement Count TMC Crosswalks](#turning-movement-count-tmc-crosswalks) + - [Error responses](#error-responses) + - [Input Files](#input-files) + - [How to run the api](#how-to-run-the-api) + - [TMCs Volumes](#tmcs-volumes) + - [Alerts](#alerts) + - [Classifications](#classifications) + - [API Classifications](#api-classifications) + - [Old Classifications csv dumps and datalink](#old-classifications-csv-dumps-and-datalink) + - [PostgreSQL Functions](#postgresql-functions) + - [Invalid Movements](#invalid-movements) + - [How the API works](#how-the-api-works) + - [Repulling data](#repulling-data) - [Airflow DAGs](#airflow-dags) - - [**`miovision_pull`**](#miovision_pull) - - [`check_partitions` TaskGroup](#check_partitions-taskgroup) - - [`miovision_agg` TaskGroup](#miovision_agg-taskgroup) - - [`data_checks` TaskGroup](#data_checks-taskgroup) - - [**`miovision_check`**](#miovision_check) - - [Notes](#notes) + - [miovision_pull](#miovision_pull) + - [check_partitions TaskGroup](#check_partitions-taskgroup) + - [miovision_agg TaskGroup](#miovision_agg-taskgroup) + - [data_checks TaskGroup](#data_checks-taskgroup) + - [miovision_check](#miovision_check) + - [miovision_hardware](#miovision_hardware) + - [Notes](#notes) + # Overview This readme contains information on the script used to pull data from the Miovision `intersection_tmc` API and descriptions of the Airflow DAGs which make use of the API scripts and [sql functions](../sql/readme.md#postgresql-functions) to pull, aggregate, and run data quality checks on new. @@ -302,6 +304,16 @@ This DAG replaces the old `check_miovision`. It is used to run daily data qualit - `check_open_anomalous_ranges`: Checks if any anomalous_range entries exist with non-zero volume in the last 7 days. Notifies if any found. + +## **`miovision_hardware`** + +This DAG pulls non-volume information including: camera details and configuration dates, which may be useful at some point. This DAG has no interdependency with the other Miovision pipelines. + +- `pull_config_dates`: pulls the **last configured** date from Miovision's `/v1/intersections/{intersectionId}/hardware/detectionConfiguration` API endpoint. This task only started running 2024-12-05, so older configurations dates were not captured. +- `pull_camera_details`: pulls the camera ids from the Miovision `/v1/intersections/{intersectionId}/cameras` endpoint. + + + ## Notes - `miovision_api.volume` table was truncated and re-run after the script was fixed and unique constraint was added to the table. Data from July 1st - Nov 21st, 2019 was inserted into the `miovision_api` schema on Nov 22nd, 2019 whereas the dates followed will be inserted into the table via airflow. diff --git a/volumes/miovision/sql/inserts/insert-camera_details.sql b/volumes/miovision/sql/inserts/insert-camera_details.sql new file mode 100644 index 000000000..0bafb8ced --- /dev/null +++ b/volumes/miovision/sql/inserts/insert-camera_details.sql @@ -0,0 +1,20 @@ +WITH camera_details ( + id, camera_id, camera_label +) AS ( + VALUES %s --noqa: PRS +) + +INSERT INTO miovision_api.camera_details ( + intersection_id, camera_id, camera_label, last_seen +) +SELECT + cd.id, + cd.camera_id, + cd.camera_label, + CURRENT_DATE AS last_seen +FROM camera_details AS cd +LEFT JOIN miovision_api.intersections USING (id) +ON CONFLICT (intersection_id, camera_id) +DO UPDATE SET +camera_label = excluded.camera_label, +last_seen = excluded.last_seen; \ No newline at end of file diff --git a/volumes/miovision/sql/readme.md b/volumes/miovision/sql/readme.md index ea78b4658..9569c49f7 100644 --- a/volumes/miovision/sql/readme.md +++ b/volumes/miovision/sql/readme.md @@ -1,6 +1,5 @@ -- [1. Overview](#1-overview) - [2. `miovision_api` Table Structure](#2-miovision_api-table-structure) - [Miovision Data Relationships at a Glance](#miovision-data-relationships-at-a-glance) - [Key Tables](#key-tables) @@ -24,6 +23,8 @@ - [`intersection_movements`](#intersection_movements) - [`centreline_miovision`](#centreline_miovision) - [`alerts`](#alerts) + - [camera\_details](#camera_details) + - [configuration\_updates](#configuration_updates) - [Primary and Foreign Keys](#primary-and-foreign-keys) - [List of primary and foreign keys](#list-of-primary-and-foreign-keys) - [Other Tables](#other-tables) @@ -43,8 +44,6 @@ -# 1. Overview - This folder contains sql scripts used in both the API and the old data dump process. The [`csv_data/`](csv_data/) sub-folder contains `sql` files unique to processing the data from csv dumps. # 2. `miovision_api` Table Structure @@ -494,6 +493,24 @@ LEFT JOIN miovision_api.alerts AS a WHERE a.intersection_uid IS NULL ``` +### camera_details +This table contains details of Miovision cameras, which we are sometimes required to provide to maintenance. It is updated daily by `miovision_hardware` Airflow DAG. Join to `miovision_api.active_intersections AS ai ON ai.id = camera_details.intersection_id`. + +| column_name | data_type | sample | +|:----------------|:------------|:-------------------------------------| +| intersection_id | text | 253a327c-4e4b-4e4e-b3a9-c2c3e7753825 | +| camera_id | text | Miovision SmartView 360 NWC | +| camera_label | text | 1ebf4ec0-88fd-49ec-8cf4-3e0ae0af0128 | +| last_seen | date | 2024-12-05 | + +### configuration_updates +This table stores the last updated date of Miovision detection configurations. This may be useful at some point in the future to determine for which dates calibration studies are relevant. It was only populated starting 2024-12-05, so the MIN `updated_time` was the most recent update at that point. Further configuration details can be seen in Miovision One. + +| column_name | data_type | sample | +|:-----------------|:----------------------------|:---------------------------| +| intersection_uid | integer | 97 | +| updated_time | timestamp without time zone | 2024-02-23 03:09:36.684000 | + ## Primary and Foreign Keys To create explicit relationships between tables, `volumes`, `volume_15min_mvt`, `atr_mvt_uid` and `volume_15min` have primary and foreign keys. Primary keys are unique identifiers for each entry in the table, while foreign keys refer to a primary key in another table and show how an entry is related to that entry. diff --git a/volumes/miovision/sql/table/create-table-camera_details.sql b/volumes/miovision/sql/table/create-table-camera_details.sql new file mode 100644 index 000000000..63949a2e2 --- /dev/null +++ b/volumes/miovision/sql/table/create-table-camera_details.sql @@ -0,0 +1,28 @@ +-- Table: miovision_api.camera_details + +-- DROP TABLE IF EXISTS miovision_api.camera_details; + +CREATE TABLE IF NOT EXISTS miovision_api.camera_details +( + intersection_id text COLLATE pg_catalog."default" NOT NULL, + camera_id text COLLATE pg_catalog."default" NOT NULL, + camera_label text COLLATE pg_catalog."default", + last_seen date NOT NULL, + CONSTRAINT camera_details_pkey PRIMARY KEY (intersection_id, camera_id) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS miovision_api.camera_details +OWNER TO miovision_api_bot; + +REVOKE ALL ON TABLE miovision_api.camera_details FROM bdit_humans; + +GRANT SELECT ON TABLE miovision_api.camera_details TO bdit_humans; + +GRANT ALL ON TABLE miovision_api.camera_details TO miovision_admins; + +GRANT ALL ON TABLE miovision_api.camera_details TO miovision_api_bot; + +COMMENT ON TABLE miovision_api.camera_details +IS 'Miovision camera details. Updated by miovision_hardware Airflow DAG.'; diff --git a/volumes/miovision/sql/table/create-table-configuration_updates.sql b/volumes/miovision/sql/table/create-table-configuration_updates.sql new file mode 100644 index 000000000..74a1d850c --- /dev/null +++ b/volumes/miovision/sql/table/create-table-configuration_updates.sql @@ -0,0 +1,29 @@ +-- Table: miovision_api.configuration_updates + +-- DROP TABLE IF EXISTS miovision_api.configuration_updates; + +CREATE TABLE IF NOT EXISTS miovision_api.configuration_updates +( + intersection_uid integer NOT NULL, + updated_time timestamp without time zone NOT NULL, + CONSTRAINT mio_configuration_updates_pkey PRIMARY KEY ( + intersection_uid, updated_time + ) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS miovision_api.configuration_updates +OWNER TO miovision_admins; + +REVOKE ALL ON TABLE miovision_api.configuration_updates FROM bdit_humans; + +GRANT SELECT ON TABLE miovision_api.configuration_updates TO bdit_humans; + +GRANT ALL ON TABLE miovision_api.configuration_updates TO miovision_admins; + +GRANT INSERT, SELECT ON TABLE miovision_api.configuration_updates TO miovision_api_bot; + +COMMENT ON TABLE miovision_api.configuration_updates IS +'Stores Miovision camera configuration update timestamps. ' +'Updated daily. Only populated since 2024-12-05. '; \ No newline at end of file