Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE public.last_positions
(
id SERIAL PRIMARY KEY,
mmsi INT UNIQUE,
vessel_id INT,
coord GEOMETRY,
status VARCHAR,
course SMALLINT,
heading SMALLINT,
speed SMALLINT,
ts TIMESTAMPTZ
);
2 changes: 1 addition & 1 deletion pipeline/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,4 @@

# Vessel repository XML
VESSEL_FILES_GID = os.getenv("VESSEL_FILES_GID")
VESSEL_FILES_DIRECTORY = "/data/vessel_repository"
VESSEL_FILES_DIRECTORY = "/data/vessel_repository"
2 changes: 2 additions & 0 deletions pipeline/src/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
update_departments_and_facades_flow,
)
from src.flows.vessel_repository import vessel_repository_flow
from src.flows.last_positions import last_positions_flow

################################# List flows to deploy ################################

Expand Down Expand Up @@ -132,6 +133,7 @@ class FlowAndSchedules:
),
FlowAndSchedules(flow=update_departments_and_facades_flow),
FlowAndSchedules(flow=vessel_repository_flow),
FlowAndSchedules(flow=last_positions_flow),
]

deployments = []
Expand Down
108 changes: 108 additions & 0 deletions pipeline/src/flows/last_positions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@

import geopandas as gpd
import pandas as pd
from prefect import flow, get_run_logger, task

from src.generic_tasks import extract, load


@task
def validate_action(action: str) -> str:
"""
Checks that the received parameter value is valid and returns it. Raises ValueError
otherwise.

Args:
action (str): input parameter for the flow

Returns:
str: input, if valid

Raises:
ValueError: if input in not valid
"""

valid_actions = {"update", "replace"}

if action in valid_actions:
return action
else:
raise ValueError(
f"action must be one of {', '.join(valid_actions)}, got {action}"
)

@task
def join(
previous_last_positions: pd.DataFrame,
last_positions: pd.DataFrame,
) -> pd.DataFrame:
last_positions = (
pd.concat(
[
previous_last_positions,
last_positions,
]
)
).sort_values("ts", ascending=False).groupby("mmsi").head(1)

return last_positions

@task
def extract_last_positions(minutes: int) -> gpd.GeoDataFrame:
"""
Extracts the last position of each vessel over the past `minutes` minutes.

Args:
minutes (int): number of minutes from current datetime to extract

Returns:
gpd.GeoDataFrame: GeoDataFrame of vessels' last position.
"""
return extract(
db_name="monitorenv_remote",
query_filepath="monitorenv/compute_last_positions.sql",
params={"minutes": minutes},
geom_col="coord"
)

@task
def extract_previous_last_positions() -> gpd.GeoDataFrame:
"""
Extracts the contents of the `last_positions` table (which was computed by the
previous run of the `last_positions` flow), with the `has_charter` field updated
by taking the current value in the `vessels` table.

Returns:
pd.DataFrame: DataFrame of vessels' last position as (it was last computed by
the last_positions flow).
"""
return extract(
db_name="monitorenv_remote",
query_filepath="monitorenv/previous_last_positions.sql",
geom_col="coord"
)

@task
def load_last_positions(last_positions):
load(
last_positions,
table_name="last_positions",
schema="public",
db_name="monitorenv_remote",
logger=get_run_logger(),
how="replace",
nullable_integer_columns=["vessel_id"]
)


@flow(name="Monitorenv - Last positions")
def last_positions_flow(minutes: int = 5, action: str = "update",):
action = validate_action(action)
last_positions = extract_last_positions(minutes=minutes)

if action == "update":
previous_last_positions = extract_previous_last_positions.submit()
last_positions = join(previous_last_positions, last_positions)

# Load
load_last_positions(last_positions)
2 changes: 1 addition & 1 deletion pipeline/src/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,4 +900,4 @@ def rows_belong_to_sequence(
np.isnan(rows_known) & rows_maybe.astype(bool), np.nan, rows_maybe
)

return res
return res
41 changes: 41 additions & 0 deletions pipeline/src/queries/monitorenv/compute_last_positions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
WITH last_n_minutes_positions AS (
SELECT
id,
mmsi,
coord,
status,
course,
heading,
speed,
ts,
ROW_NUMBER() OVER (
PARTITION BY mmsi
ORDER BY ts DESC, id DESC) AS rk
FROM ais_positions
WHERE ts > CURRENT_TIMESTAMP - make_interval(mins => :minutes)
AND ts < CURRENT_TIMESTAMP + INTERVAL '1 day'
AND mmsi IS NOT NULL
),

last_positions AS (
SELECT *
FROM last_n_minutes_positions
WHERE rk = 1
)

SELECT
-- The DISTINCT ON clause is required to remove possible duplicates due to vessels
-- for which we receive each position multiple times
DISTINCT ON (lp.mmsi)
lp.id,
lv.ship_id as vessel_id,
lp.mmsi,
lp.coord,
lp.status,
lp.course,
lp.heading,
lp.speed,
lp.ts
FROM last_positions lp
LEFT JOIN latest_vessels lv
ON lp.mmsi = lv.mmsi_number::integer;
11 changes: 11 additions & 0 deletions pipeline/src/queries/monitorenv/previous_last_positions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SELECT
pos.id,
pos.mmsi,
pos.vessel_id,
pos.coord,
pos.status,
pos.course,
pos.heading,
pos.speed,
pos.ts
FROM last_positions pos
6 changes: 6 additions & 0 deletions pipeline/tests/test_data/remote_database/V666.31__vessels.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
TRUNCATE public.vessels;

INSERT INTO public.vessels (ship_id, mmsi_number, status, is_banned, batch_id, row_number) VALUES (1, '545437273', 'A', false, 1, 1),
(2, '755136766', 'A', false, 1, 1), (5, '851385830', 'A', false, 1, 1), (6, '598693403', 'A', false, 1, 1);

REFRESH MATERIALIZED VIEW public.latest_vessels;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
TRUNCATE public.ais_positions;

INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (1, 545437273, 'POINT (-29.01623785635482 -47.158559321739695)', 'STATUT_1', 6666, 8632, 9542, NOW() - INTERVAL '10 minutes');
INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (2, 755136766, 'POINT (-129.56689057250617 53.65961394926839)', 'STATUT_2', 5595, 4258, 533, NOW() - INTERVAL '10 minutes');
INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (3, 92030123, 'POINT (11.05964276248551 2.8143566255156713)', 'STATUT_3', 5225, 1476, 3750, NOW() - INTERVAL '30 minutes');
INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (4, 883168729, 'POINT (-104.19945639555388 -36.31227256604617)', 'STATUT_4', 6547, 4734, 1045, NOW() - INTERVAL '30 minutes');
INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (5, 851385830, 'POINT (168.45242103624486 -44.45202385317484)', 'STATUT_5', 8090, 9890, 6750, NOW() - INTERVAL '1 day');
INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (6, 598693403, 'POINT (78.18423409817728 40.52294043382852)', 'STATUT_6', 5954, 5087, 9486, '2020-12-01 00:00:00 +00:00');
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
TRUNCATE public.last_positions;

INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (1, 545437273, 'POINT (-29.01623785635482 -47.158559321739695)', 'STATUS_1', 6666, 8632, 9542, '2025-12-11 09:13:22.628718 +00:00');
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (2, 755136766, 'POINT (-129.56689057250617 53.65961394926839)', 'STATUS_2', 5595, 4258, 533, '2025-12-11 09:13:37.640976 +00:00');
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (3, 92030123, 'POINT (11.05964276248551 2.8143566255156713)', 'STATUS_3', 5225, 1476, 3750, '2025-12-11 09:13:52.632506 +00:00');
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (4, 883168729, 'POINT (-104.19945639555388 -36.31227256604617)', 'STATUS_4', 6547, 4734, 1045, '2025-12-11 09:14:07.629067 +00:00');
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (5, 851385830, 'POINT (168.45242103624486 -44.45202385317484)', 'STATUS_5', 8090, 9890, 6750, '2025-12-11 09:14:22.626099 +00:00');
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (6, 598693403, 'POINT (78.18423409817728 40.52294043382852)', 'STATUS_6', 5954, 5087, 9486, '2025-12-11 09:14:37.627802 +00:00');
119 changes: 119 additions & 0 deletions pipeline/tests/test_flows/test_last_positions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from datetime import datetime

import pandas as pd
import pytest

from src.flows.last_positions import (
extract_last_positions,
extract_previous_last_positions,
last_positions_flow,
load_last_positions,
validate_action,
)
from src.read_query import read_query

def test_extract_previous_last_positions(reset_test_data):
previous_last_positions = extract_previous_last_positions()
assert previous_last_positions.shape == (6, 9)


def test_extract_last_positions(reset_test_data):
last_positions = extract_last_positions(minutes=15)
assert last_positions.shape == (2, 9)

last_positions = extract_last_positions(minutes=35)
assert last_positions.shape == (4, 9)



def test_load_last_positions(reset_test_data):
last_positions_to_load = pd.DataFrame(
{
"id": [13639642, 13640935],
"vessel_id": [1, None],
"mmsi": [123456789, 987654321],
"coord": ["POINT(-2.7335 47.6078)", "POINT(-2.7335 47.6078)"],
"speed": [1, 0],
"course": [302, 0],
"heading": [1, 0],
"ts": [
datetime(2021, 12, 5, 11, 52, 32),
datetime(2018, 12, 5, 11, 52, 32),
],
}
)
load_last_positions(last_positions_to_load)


def test_validate_action():
assert validate_action("update") == "update"
assert validate_action("replace") == "replace"
with pytest.raises(ValueError):
validate_action("unknown_option")

def test_last_positions_flow_resets_last_positions_when_action_is_replace(
reset_test_data,
):
initial_last_positions = read_query(
query = "SELECT * FROM last_positions;", db="monitorenv_remote"
)

state = last_positions_flow(
action="replace",
minutes=1200,
return_state=True,
)
assert state.is_completed()

final_last_positions = read_query(
query = "SELECT * FROM last_positions;", db="monitorenv_remote"
)

assert len(initial_last_positions) == 6
assert len(final_last_positions) == 4
assert set(final_last_positions.mmsi) == {
545437273,
755136766,
92030123,
883168729,
}
assert set(final_last_positions.vessel_id.dropna()) == {
1,
2,
}


def test_last_positions_flow_updates_last_positions_when_action_is_update(
reset_test_data,
):
initial_last_positions = read_query(
query = "SELECT * FROM last_positions;", db="monitorenv_remote"
)

state = last_positions_flow(
action="update",
minutes=35,
return_state=True,
)
assert state.is_completed()

final_last_positions = read_query(
query = "SELECT * FROM last_positions;", db="monitorenv_remote"
)

assert len(initial_last_positions) == 6
assert len(final_last_positions) == 6
assert set(initial_last_positions.mmsi) == {
545437273,
755136766,
92030123,
883168729,
851385830,
598693403
}
assert set(final_last_positions.vessel_id.dropna()) == {
1,
2,
5,
6
}
8 changes: 8 additions & 0 deletions pipeline/tests/test_flows/test_vessel_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from unittest.mock import patch

import pandas as pd
from sqlalchemy import text

from config import TEST_DATA_LOCATION
from src.db_config import create_engine
from src.flows.vessel_repository import (
delete_files,
get_xsd_schema,
Expand All @@ -30,6 +32,12 @@ def test_delete_file(tmp_path):


def test_parse_and_load(create_cacem_tables, reset_test_data):

e = create_engine("monitorenv_remote")
with e.begin() as connection:
connection.execute(
text('TRUNCATE vessels;')
)
xml_path = TEST_DATA_LOCATION / "vessel_xml" / "vessel_repository.xml"
xsd_path = TEST_DATA_LOCATION / "vessel_xml" / "vessel_repository.xsd"
schema = get_xsd_schema(xsd_path)
Expand Down
2 changes: 1 addition & 1 deletion pipeline/tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,4 +860,4 @@ def test_rows_belong_to_sequence(self):
row = np.array([True, True])
res = rows_belong_to_sequence(arr, row, 7)
expected_res = np.array([np.nan, np.nan, np.nan, 0.0, 0.0, 0.0])
np.testing.assert_array_equal(res, expected_res)
np.testing.assert_array_equal(res, expected_res)
Loading