Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions dags/miovision_hardware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import sys
import os
import pendulum
from datetime import timedelta

from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

try:
repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.insert(0, repo_path)
from dags.dag_functions import task_fail_slack_alert, get_readme_docmd
from volumes.miovision.api.configuration_info import (
get_cameras, get_configuration_dates
)
except:
raise ImportError("Cannot import DAG helper functions.")

DAG_NAME = 'miovision_hardware'
DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"])

README_PATH = os.path.join(repo_path, 'volumes/miovision/api/readme.md')
DOC_MD = get_readme_docmd(README_PATH, DAG_NAME)

default_args = {
'owner': ','.join(DAG_OWNERS),
'depends_on_past': False,
'start_date': pendulum.datetime(2024, 12, 5, tz="America/Toronto"),
'email_on_failure': False,
'email_on_success': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': task_fail_slack_alert
}

@dag(
dag_id=DAG_NAME,
default_args=default_args,
schedule='0 2 * * *',
catchup=False,
tags=["miovision", "data_pull"],
doc_md=DOC_MD
)
def pull_miovision_dag():

@task(retries = 1)
def pull_config_dates():
mio_postgres = PostgresHook("miovision_api_bot")
with mio_postgres.get_conn() as conn:
get_configuration_dates(conn)

@task(retries = 1)
def pull_camera_details():
mio_postgres = PostgresHook("miovision_api_bot")
with mio_postgres.get_conn() as conn:
get_cameras(conn)

pull_config_dates()
pull_camera_details()

pull_miovision_dag()
98 changes: 98 additions & 0 deletions volumes/miovision/api/configuration_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
'''Script that can be run manually to update `miovision_api.camera_details`.


'''
import pytz
import os
from requests import Session
import pandas as pd
from psycopg2 import sql
from psycopg2.extras import execute_values
from datetime import datetime
import logging

from airflow.hooks.base_hook import BaseHook

from .intersection_tmc import get_intersection_info

LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

SQL_DIR = os.path.join(os.path.dirname(os.path.abspath(os.path.dirname(__file__))), 'sql')

session = Session()
session.proxies = {}

def headers():
'''get api key from airflow variable.'''
api_key = BaseHook.get_connection('miovision_api_key')
headers = {
'Content-Type': 'application/json',
'apikey': api_key.extra_dejson['key']
}
return headers

URL_BASE = "https://api.miovision.one/api/v1"

def get_cameras(conn):
intersections = get_intersection_info(conn)
cameras = pd.DataFrame()
HEADERS=headers()
#for each intersection, query it's camera details
for intersection in intersections:
response = session.get(
URL_BASE + f"/intersections/{intersection.id1}/cameras",
params={},
headers=HEADERS,
proxies=session.proxies
)
if response.status_code == 200:
cameras_i = pd.DataFrame(response.json()['cameras'])
if cameras_i.empty:
continue
cameras_i = cameras_i[['id', 'label']]
cameras_i = cameras_i.add_prefix('camera_')
cameras_i['intersection_id'] = intersection.id1
cameras = pd.concat([cameras, cameras_i])
else:
#don't need to fail this non-critical pipeline
LOGGER.info(f"Intersection {intersection.id1} recieved {response.status_code} error: {response.reason}")

final = [tuple(x) for x in cameras.to_numpy()] #convert to tuples for inserting

fpath = os.path.join(SQL_DIR, 'inserts/insert-camera_details.sql')
with open(fpath, 'r', encoding='utf-8') as file:
insert_query = sql.SQL(file.read())

# Get intersections currently stored in `miovision_api` on Postgres.
with conn.cursor() as cur:
execute_values(cur, insert_query, final)

def get_configuration_dates(conn):
intersections = get_intersection_info(conn)
HEADERS=headers()
configs = []
for intersection in intersections:
response = session.get(
URL_BASE + f"/intersections/{intersection.id1}/hardware/detectionConfiguration",
params={},
headers=HEADERS,
proxies=session.proxies
)
if response.status_code == 200:
if response.json()['lastUpdated'] is not None:
config_i = (
intersection.uid,
datetime.fromtimestamp(
response.json()['lastUpdated']/1000,
tz=pytz.timezone('America/Toronto')
)
)
configs.append(config_i)
else:
#don't need to fail this non-critical pipeline
LOGGER.info(f"Intersection {intersection.id1} recieved {response.status_code} error: {response.reason}")
sql='''INSERT INTO miovision_api.configuration_updates (intersection_uid, updated_time) VALUES %s
ON CONFLICT (intersection_uid, updated_time) DO NOTHING'''
with conn.cursor() as cur:
execute_values(cur, sql, configs)
54 changes: 33 additions & 21 deletions volumes/miovision/api/readme.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
<!-- TOC -->

- [Overview](#overview)
- [Relevant Calls and Outputs](#relevant-calls-and-outputs)
- [Turning Movement Count (TMC)](#turning-movement-count-tmc)
- [Turning Movement Count (TMC) Crosswalks](#turning-movement-count-tmc-crosswalks)
- [Error responses](#error-responses)
- [Input Files](#input-files)
- [How to run the api](#how-to-run-the-api)
- [TMCs (Volumes)](#tmcs-volumes)
- [Alerts](#alerts)
- [Classifications](#classifications)
- [API Classifications](#api-classifications)
- [Old Classifications (csv dumps and datalink)](#old-classifications-csv-dumps-and-datalink)
- [PostgreSQL Functions](#postgresql-functions)
- [Invalid Movements](#invalid-movements)
- [How the API works](#how-the-api-works)
- [Repulling data](#repulling-data)
- [Relevant Calls and Outputs](#relevant-calls-and-outputs)
- [Turning Movement Count TMC](#turning-movement-count-tmc)
- [Turning Movement Count TMC Crosswalks](#turning-movement-count-tmc-crosswalks)
- [Error responses](#error-responses)
- [Input Files](#input-files)
- [How to run the api](#how-to-run-the-api)
- [TMCs Volumes](#tmcs-volumes)
- [Alerts](#alerts)
- [Classifications](#classifications)
- [API Classifications](#api-classifications)
- [Old Classifications csv dumps and datalink](#old-classifications-csv-dumps-and-datalink)
- [PostgreSQL Functions](#postgresql-functions)
- [Invalid Movements](#invalid-movements)
- [How the API works](#how-the-api-works)
- [Repulling data](#repulling-data)
- [Airflow DAGs](#airflow-dags)
- [**`miovision_pull`**](#miovision_pull)
- [`check_partitions` TaskGroup](#check_partitions-taskgroup)
- [`miovision_agg` TaskGroup](#miovision_agg-taskgroup)
- [`data_checks` TaskGroup](#data_checks-taskgroup)
- [**`miovision_check`**](#miovision_check)
- [Notes](#notes)
- [miovision_pull](#miovision_pull)
- [check_partitions TaskGroup](#check_partitions-taskgroup)
- [miovision_agg TaskGroup](#miovision_agg-taskgroup)
- [data_checks TaskGroup](#data_checks-taskgroup)
- [miovision_check](#miovision_check)
- [miovision_hardware](#miovision_hardware)
- [Notes](#notes)

<!-- /TOC -->
<!-- /TOC -->

# Overview
This readme contains information on the script used to pull data from the Miovision `intersection_tmc` API and descriptions of the Airflow DAGs which make use of the API scripts and [sql functions](../sql/readme.md#postgresql-functions) to pull, aggregate, and run data quality checks on new.
Expand Down Expand Up @@ -302,6 +304,16 @@ This DAG replaces the old `check_miovision`. It is used to run daily data qualit
- `check_open_anomalous_ranges`: Checks if any anomalous_range entries exist with non-zero volume in the last 7 days. Notifies if any found.
<!-- miovision_check_doc_md -->

<!-- miovision_hardware_doc_md -->
## **`miovision_hardware`**

This DAG pulls non-volume information including: camera details and configuration dates, which may be useful at some point. This DAG has no interdependency with the other Miovision pipelines.

- `pull_config_dates`: pulls the **last configured** date from Miovision's `/v1/intersections/{intersectionId}/hardware/detectionConfiguration` API endpoint. This task only started running 2024-12-05, so older configurations dates were not captured.
- `pull_camera_details`: pulls the camera ids from the Miovision `/v1/intersections/{intersectionId}/cameras` endpoint.

<!-- miovision_hardware_doc_md -->

## Notes

- `miovision_api.volume` table was truncated and re-run after the script was fixed and unique constraint was added to the table. Data from July 1st - Nov 21st, 2019 was inserted into the `miovision_api` schema on Nov 22nd, 2019 whereas the dates followed will be inserted into the table via airflow.
Expand Down
20 changes: 20 additions & 0 deletions volumes/miovision/sql/inserts/insert-camera_details.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
WITH camera_details (
id, camera_id, camera_label
) AS (
VALUES %s --noqa: PRS
)

INSERT INTO miovision_api.camera_details (
intersection_id, camera_id, camera_label, last_seen
)
SELECT
cd.id,
cd.camera_id,
cd.camera_label,
CURRENT_DATE AS last_seen
FROM camera_details AS cd
LEFT JOIN miovision_api.intersections USING (id)
ON CONFLICT (intersection_id, camera_id)
DO UPDATE SET
camera_label = excluded.camera_label,
last_seen = excluded.last_seen;
23 changes: 20 additions & 3 deletions volumes/miovision/sql/readme.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
<!-- TOC -->

- [1. Overview](#1-overview)
- [2. `miovision_api` Table Structure](#2-miovision_api-table-structure)
- [Miovision Data Relationships at a Glance](#miovision-data-relationships-at-a-glance)
- [Key Tables](#key-tables)
Expand All @@ -24,6 +23,8 @@
- [`intersection_movements`](#intersection_movements)
- [`centreline_miovision`](#centreline_miovision)
- [`alerts`](#alerts)
- [camera\_details](#camera_details)
- [configuration\_updates](#configuration_updates)
- [Primary and Foreign Keys](#primary-and-foreign-keys)
- [List of primary and foreign keys](#list-of-primary-and-foreign-keys)
- [Other Tables](#other-tables)
Expand All @@ -43,8 +44,6 @@

<!-- /TOC -->

# 1. Overview

This folder contains sql scripts used in both the API and the old data dump process. The [`csv_data/`](csv_data/) sub-folder contains `sql` files unique to processing the data from csv dumps.

# 2. `miovision_api` Table Structure
Expand Down Expand Up @@ -494,6 +493,24 @@ LEFT JOIN miovision_api.alerts AS a
WHERE a.intersection_uid IS NULL
```

### camera_details
This table contains details of Miovision cameras, which we are sometimes required to provide to maintenance. It is updated daily by `miovision_hardware` Airflow DAG. Join to `miovision_api.active_intersections AS ai ON ai.id = camera_details.intersection_id`.

| column_name | data_type | sample |
|:----------------|:------------|:-------------------------------------|
| intersection_id | text | 253a327c-4e4b-4e4e-b3a9-c2c3e7753825 |
| camera_id | text | Miovision SmartView 360 NWC |
| camera_label | text | 1ebf4ec0-88fd-49ec-8cf4-3e0ae0af0128 |
| last_seen | date | 2024-12-05 |

### configuration_updates
This table stores the last updated date of Miovision detection configurations. This may be useful at some point in the future to determine for which dates calibration studies are relevant. It was only populated starting 2024-12-05, so the MIN `updated_time` was the most recent update at that point. Further configuration details can be seen in Miovision One.

| column_name | data_type | sample |
|:-----------------|:----------------------------|:---------------------------|
| intersection_uid | integer | 97 |
| updated_time | timestamp without time zone | 2024-02-23 03:09:36.684000 |

## Primary and Foreign Keys

To create explicit relationships between tables, `volumes`, `volume_15min_mvt`, `atr_mvt_uid` and `volume_15min` have primary and foreign keys. Primary keys are unique identifiers for each entry in the table, while foreign keys refer to a primary key in another table and show how an entry is related to that entry.
Expand Down
28 changes: 28 additions & 0 deletions volumes/miovision/sql/table/create-table-camera_details.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- Table: miovision_api.camera_details

-- DROP TABLE IF EXISTS miovision_api.camera_details;

CREATE TABLE IF NOT EXISTS miovision_api.camera_details
(
intersection_id text COLLATE pg_catalog."default" NOT NULL,
camera_id text COLLATE pg_catalog."default" NOT NULL,
camera_label text COLLATE pg_catalog."default",
last_seen date NOT NULL,
CONSTRAINT camera_details_pkey PRIMARY KEY (intersection_id, camera_id)
)

TABLESPACE pg_default;

ALTER TABLE IF EXISTS miovision_api.camera_details
OWNER TO miovision_api_bot;

REVOKE ALL ON TABLE miovision_api.camera_details FROM bdit_humans;

GRANT SELECT ON TABLE miovision_api.camera_details TO bdit_humans;

GRANT ALL ON TABLE miovision_api.camera_details TO miovision_admins;

GRANT ALL ON TABLE miovision_api.camera_details TO miovision_api_bot;

COMMENT ON TABLE miovision_api.camera_details
IS 'Miovision camera details. Updated by miovision_hardware Airflow DAG.';
29 changes: 29 additions & 0 deletions volumes/miovision/sql/table/create-table-configuration_updates.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- Table: miovision_api.configuration_updates

-- DROP TABLE IF EXISTS miovision_api.configuration_updates;

CREATE TABLE IF NOT EXISTS miovision_api.configuration_updates
(
intersection_uid integer NOT NULL,
updated_time timestamp without time zone NOT NULL,
CONSTRAINT mio_configuration_updates_pkey PRIMARY KEY (
intersection_uid, updated_time
)
)

TABLESPACE pg_default;

ALTER TABLE IF EXISTS miovision_api.configuration_updates
OWNER TO miovision_admins;

REVOKE ALL ON TABLE miovision_api.configuration_updates FROM bdit_humans;

GRANT SELECT ON TABLE miovision_api.configuration_updates TO bdit_humans;

GRANT ALL ON TABLE miovision_api.configuration_updates TO miovision_admins;

GRANT INSERT, SELECT ON TABLE miovision_api.configuration_updates TO miovision_api_bot;

COMMENT ON TABLE miovision_api.configuration_updates IS
'Stores Miovision camera configuration update timestamps. '
'Updated daily. Only populated since 2024-12-05. ';
Loading