Skip to content

KSI Open Data pipeline DAG#1343

Open
chmnata wants to merge 6 commits intomasterfrom
ksi-opendata
Open

KSI Open Data pipeline DAG#1343
chmnata wants to merge 6 commits intomasterfrom
ksi-opendata

Conversation

@chmnata
Copy link
Collaborator

@chmnata chmnata commented Feb 3, 2026

What this pull request accomplishes:

  • dag for refreshing ksi open data matview and replaces data in table when checks passed

Issue(s) this solves:

What, in particular, needs to reviewed:

  • DAG code
  • Mat view for creating the ksi table
  • readme can be updated after we finalize the text for open data page

What needs to be done by a sysadmin after this PR is merged

ln -sf in airflow

task_id="check_dup_collision",
sql='''
SELECT COUNT(*) = 0 AS _check, 'There are '|| count(*) ||' deleted collision person pair comparing to last updated. '||
'\n```'||ARRAY_TO_STRING(array_agg(collision_id), ', ')||'```' AS missing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to include per_no here?

SELECT COUNT(*) = 0 AS _check, 'There are '|| count(*) ||' of NULL fatal_no for fatals. '||
'\n```'||ARRAY_TO_STRING(array_agg(collision_id), ', ')||'```' AS missing
FROM(
SELECT collision_id FROM open_data_staging.ksi WHERE injury = 'Fatal' AND fatal_no IS NULL) AS diff;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would know better than me, but is injury = 'Fatal' type safe?

(accdate, stname1, streetype1, dir1, stname2, streetype2, dir2, stname3, streetype3, dir3, per_inv, acclass, accloc, traffictl, impactype, visible, light, rdsfcond, changed, road_class, failtorem, longitude, latitude, veh_no, vehtype, initdir, per_no, invage, injury, safequip, drivact, drivcond, pedact, pedcond, manoeuver, pedtype, cyclistype, cycact, cyccond, road_user, fatal_no, wardname, division, neighbourhood, aggressive, distracted, city_damage, cyclist, motorcyclist, other_micromobility, older_adult, pedestrian, red_light, school_child, heavy_truck)::text AS records,
count(1)
FROM open_data_staging.ksi
group by collision_id, records
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be more specific, like duplicate collision_id + person_id?

)
return check_deleted_collision, check_missing_person, check_null_fatal_no, check_dup_collisions

check_deleted_collision, check_missing_person, check_null_fatal_no, check_dup_collisions = data_checks()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this strange syntax? 😮


check_deleted_collision, check_missing_person, check_null_fatal_no, check_dup_collisions = data_checks()

@task
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional idea: you could make this a task.branch and get rid of decide_approval. Would just need to use ti.xcom_push instead of return for the XCOMs.


skip_approval = EmptyOperator(task_id="skip_approval")

approve_refresh = ApprovalOperator(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want anything to happen when you hit reject? Right now just skips downstream tasks right?

from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.sdk import Variable
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combine with from airflow.sdk above

pb.unit_name AS division
FROM gis.police_boundary AS pb
WHERE
ST_Intersects(pb.geom, ST_Setsrid(ST_makepoint(events.longitude, events.latitude), 4326))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You use this 10 times: ST_Setsrid(ST_makepoint(events.longitude, events.latitude), 4326)
Maybe you can create an initial LATERAL to compute it, or create it upstream in events table?

@@ -0,0 +1,135 @@
CREATE MATERIALIZED VIEW open_data_staging.ksi AS
SELECT
ROW_NUMBER() OVER (ORDER BY accdate, per_no) as uid,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is important, but you could get uids changing in this way.
An alternative I guess would be to have a lookup table of accdate, per_no, uid that gets inserted into and that stores the uids in a more permanent way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants