Skip to content

Commit

Permalink
feat: Add message filtering configuration and update filtering logic
Browse files Browse the repository at this point in the history
  • Loading branch information
osundwajeff committed Feb 17, 2025
1 parent cb680a0 commit 9d2f740
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 79 deletions.
107 changes: 76 additions & 31 deletions django_project/dcas/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,49 +129,68 @@ def calculate_message_output(
return row


def get_last_message_date(
def get_last_message_dates(
farm_id: int,
crop_id: int,
message_code: str,
min_allowed_date: pd.Timestamp,
historical_parquet_path: str
) -> pd.Timestamp:
) -> pd.DataFrame:
"""
Get the last date a message code was sent for a specific farm and crop.
Get all messages for a given farm after min_allowed_date.
:param farm_id: ID of the farm
:type farm_id: int
:param min_allowed_date: Minimum date for filtering messages
:type min_allowed_date: pd.Timestamp
:param historical_parquet_path: Path to historical message parquet file
:type historical_parquet_path: str
:return: Filtered DataFrame containing relevant messages
:rtype: pd.DataFrame
"""
# Read historical messages
historical_data = read_grid_crop_data(historical_parquet_path, [], [])

Check warning on line 150 in django_project/dcas/functions.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/functions.py#L150

Added line #L150 was not covered by tests

# Filter messages for the given farm and min_allowed_date
filtered_data = historical_data[

Check warning on line 153 in django_project/dcas/functions.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/functions.py#L153

Added line #L153 was not covered by tests
(historical_data['farm_id'] == farm_id) &
(historical_data['message_date'] >= min_allowed_date)
].copy()

return filtered_data

Check warning on line 158 in django_project/dcas/functions.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/functions.py#L158

Added line #L158 was not covered by tests


def get_last_message_date(
farm_messages: pd.DataFrame,
crop_id: int,
message_code: str
) -> pd.Timestamp:
"""
Get the last date a message code was sent for a specific crop.
:param farm_messages: Pre-filtered messages for a farm
:type farm_messages: pd.DataFrame
:param crop_id: ID of the crop
:type crop_id: int
:param message_code: The message code to check
:type message_code: str
:param historical_parquet_path: Path to the historical message parquet file
:type historical_parquet_path: str
:return: Timestamp of the last message occurrence or None if not found
:rtype: pd.Timestamp or None
"""
# Read historical messages
historical_data = read_grid_crop_data(
historical_parquet_path, [], [crop_id],
)

# Filter messages for the given farm, crop, and message code
filtered_data = historical_data[
(historical_data['farm_id'] == farm_id) &
(historical_data['crop_id'] == crop_id) &
# Further filter for the specific crop and message_code
filtered_data = farm_messages[
(farm_messages['crop_id'] == crop_id) &
(
(historical_data['message'] == message_code) |
(historical_data['message_2'] == message_code) |
(historical_data['message_3'] == message_code) |
(historical_data['message_4'] == message_code) |
(historical_data['message_5'] == message_code)
(farm_messages['message'] == message_code) |
(farm_messages['message_2'] == message_code) |
(farm_messages['message_3'] == message_code) |
(farm_messages['message_4'] == message_code) |
(farm_messages['message_5'] == message_code)
)
]

# If no record exists, return None
if filtered_data.empty:
return None

# Return the most recent message date
# Return the most recent message date or None if empty
return filtered_data['message_date'].max()


Expand All @@ -192,16 +211,45 @@ def filter_messages_by_weeks(
:return: DataFrame with duplicate messages removed
:rtype: pd.DataFrame
"""
print("Available columns in df:", df.columns) # Debugging line

if 'farm_id' not in df.columns:
df["farm_id"] = df["grid_id"]

Check warning on line 215 in django_project/dcas/functions.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/functions.py#L215

Added line #L215 was not covered by tests
# id' is missing in the DataFrame!")

min_allowed_date = (
pd.Timestamp.now() - pd.Timedelta(weeks=weeks_constraint)
)

# Load historical messages once for all farms in df
unique_farm_ids = df['farm_id'].unique()
historical_data = read_grid_crop_data(historical_parquet_path, [], [])

# Filter historical messages for relevant farms and min_allowed_date
historical_data = historical_data[
(historical_data['farm_id'].isin(unique_farm_ids)) &
(historical_data['message_date'] >= min_allowed_date)
]

# Create a lookup dictionary
message_lookup = {}
for _, row in historical_data.iterrows():
farm_id, crop_id = row['farm_id'], row['crop_id']
for message_column in [
'message',
'message_2',
'message_3',
'message_4',
'message_5'
]:
message_code = row[message_column]
if pd.notna(message_code):
message_lookup[(farm_id, crop_id, message_code)] = max(
message_lookup.get(
(farm_id, crop_id, message_code), pd.Timestamp.min),
row['message_date']
)

# Remove messages that have already been sent recently
for idx, row in df.iterrows():
farm_id, crop_id = row['farm_id'], row['crop_id']
for message_column in [
'message',
'message_2',
Expand All @@ -214,11 +262,8 @@ def filter_messages_by_weeks(
if pd.isna(message_code):
continue # Skip empty messages

last_sent_date = get_last_message_date(
row['farm_id'],
row['crop_id'],
message_code,
historical_parquet_path)
last_sent_date = message_lookup.get(
(farm_id, crop_id, message_code), None)

if last_sent_date and last_sent_date >= min_allowed_date:
df.at[idx, message_column] = None # Remove duplicate message
Expand Down
14 changes: 10 additions & 4 deletions django_project/dcas/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
from django.contrib.gis.db.models import Union
from sqlalchemy import create_engine

from gap.models import FarmRegistry, Grid, CropGrowthStage
from gap.models import (
FarmRegistry, Grid, CropGrowthStage,
Preferences
)
from dcas.models import DCASConfig, DCASConfigCountry
from dcas.partitions import (
process_partition_total_gdd,
Expand Down Expand Up @@ -442,6 +445,8 @@ def process_farm_registry_data(self):
self.duck_db_num_threads,
meta=farm_df_meta
)
if Preferences.load().enable_message_filtering:
self.filter_message_output()

Check warning on line 449 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L449

Added line #L449 was not covered by tests

self.data_output.save(OutputType.FARM_CROP_DATA, farm_df)

Expand Down Expand Up @@ -495,11 +500,14 @@ def filter_message_output(self):
"message_5": "object",
"message_date": "datetime64[ns]",
}
data_parquet_path = self.data_output._get_directory_path(
self.data_output.DCAS_OUTPUT_DIR
) + '/iso_a3=*/year=*/month=*/day=*/*.parquet'

# Apply message filtering
df = df.map_partitions(
filter_messages_by_weeks,
self.data_output.grid_crop_data_path,
data_parquet_path,
2, # Weeks constraint (default: 2 weeks)
meta=meta
)
Expand All @@ -523,8 +531,6 @@ def run(self):
self.process_grid_crop_data()
self.process_farm_registry_data()

self.filter_message_output()

self.extract_csv_output()

Check warning on line 534 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L534

Added line #L534 was not covered by tests

self.cleanup_gdd_matrix()

Check warning on line 536 in django_project/dcas/pipeline.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/pipeline.py#L536

Added line #L536 was not covered by tests
Expand Down
132 changes: 88 additions & 44 deletions django_project/dcas/tests/test_pipeline_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def test_get_last_message_date_exists(self, mock_read_grid_crop_data):
'message_4': [None, None, None, None, None, None],
'message_5': [None, None, None, 'MSG1', None, 'MSG4'],
'message_date': [
now - timedelta(days=15), # MSG1 - Old
now - timedelta(days=15), # MSG1 - Oldest farm 1, crop 100
now - timedelta(days=10), # MSG2
now - timedelta(days=5), # MSG1 - More recent
now - timedelta(days=12), # MSG3
Expand All @@ -248,60 +248,74 @@ def test_get_last_message_date_exists(self, mock_read_grid_crop_data):
]
})

# Simulate `read_grid_crop_data` returning the dataset
mock_read_grid_crop_data.return_value = mock_data

# Latest MSG1 should be at index 4 (3 days ago)
result = get_last_message_date(2, 101, "MSG1", "/fake/path")
assert result == mock_data['message_date'].iloc[4]
# Pre-filter messages for farm 2
farm_messages_farm_2 = mock_data[mock_data["farm_id"] == 2]

# Latest MSG3 should be at index 3 (12 days ago)
result = get_last_message_date(2, 101, "MSG3", "/fake/path")
assert result == mock_data['message_date'].iloc[4]
# Latest MSG1 for farm 2, crop 101 should be at index 4 (3 days ago)
result = get_last_message_date(farm_messages_farm_2, 101, "MSG1")
self.assertEqual(result, mock_data['message_date'].iloc[4])

# Latest MSG2 should be at index 1 (10 days ago)
result = get_last_message_date(1, 100, "MSG2", "/fake/path")
assert result == mock_data['message_date'].iloc[1]
# Latest MSG3 for farm 2, crop 101 should be at index 3 (12 days ago)
result = get_last_message_date(farm_messages_farm_2, 101, "MSG3")
self.assertEqual(result, mock_data['message_date'].iloc[4])

# Pre-filter messages for farm 1
farm_messages_farm_1 = mock_data[mock_data["farm_id"] == 1]

# Latest MSG2 for farm 1, crop 100 should be at index 1 (10 days ago)
result = get_last_message_date(farm_messages_farm_1, 100, "MSG2")
self.assertEqual(result, mock_data['message_date'].iloc[1])

# Latest MSG1 for farm 1, crop 100 should be at index 2 (5 days ago)
result = get_last_message_date(1, 100, "MSG1", "/fake/path")
assert result == mock_data['message_date'].iloc[2]
result = get_last_message_date(farm_messages_farm_1, 100, "MSG1")
self.assertEqual(result, mock_data['message_date'].iloc[2])

# MSG5 exists only once, at index 3 (12 days ago)
result = get_last_message_date(2, 101, "MSG5", "/fake/path")
assert result is None # No MSG5 found
# MSG5 does not exist in the dataset for farm 2, crop 101
result = get_last_message_date(farm_messages_farm_2, 101, "MSG5")
self.assertIsNone(result)

@patch("dcas.functions.read_grid_crop_data")
def test_get_last_message_date_not_exists(self, mock_read_grid_crop_data):
"""Test when the message does not exist in history."""
now = pd.Timestamp(datetime.now())

# Mock DataFrame with different messages, but not "MSG1"
mock_data = pd.DataFrame({
'farm_id': [1, 1, 2],
'crop_id': [100, 100, 101],
'message': ['MSG2', 'MSG3', 'MSG4'],
'message_2': [None, None, None],
'message_3': [None, None, None],
'message_4': [None, None, None],
'message_5': [None, None, None],
'message_2': ['MSG5', None, None], # Different message
'message_3': [None, 'MSG6', None], # Different message
'message_4': [None, None, 'MSG7'], # Different message
'message_5': [None, None, None], # No relevant messages
'message_date': [
pd.Timestamp(datetime.now() - timedelta(days=10)),
pd.Timestamp(datetime.now() - timedelta(days=5)),
pd.Timestamp(datetime.now() - timedelta(days=3))
now - timedelta(days=10), # MSG2
now - timedelta(days=5), # MSG3
now - timedelta(days=3) # MSG4
]
})

mock_read_grid_crop_data.return_value = mock_data

result = get_last_message_date(1, 100, "MSG1", "/fake/path")
# Attempting to get "MSG1", which is not present in the history
result = get_last_message_date(mock_data, 100, "MSG1")

# Ensure that the function correctly returns None
self.assertIsNone(result)

@patch("dcas.functions.read_grid_crop_data")
def test_get_last_message_date_multiple_messages(
self,
mock_read_grid_crop_data
self, mock_read_grid_crop_data
):
"""
Test when the same message appears multiple times.
And should return the most recent timestamp.
It should return the most recent timestamp.
"""
# Mock DataFrame representing historical messages
mock_data = pd.DataFrame({
'farm_id': [1, 1, 1],
'crop_id': [100, 100, 100],
Expand All @@ -311,22 +325,38 @@ def test_get_last_message_date_multiple_messages(
'message_4': [None, None, None],
'message_5': [None, None, None],
'message_date': [
pd.Timestamp(datetime.now() - timedelta(days=15)),
pd.Timestamp(datetime.now() - timedelta(days=7)),
pd.Timestamp(datetime.now() - timedelta(days=2))
pd.Timestamp(datetime.now() - timedelta(days=15)), # Oldest
pd.Timestamp(datetime.now() - timedelta(days=7)), # Middle
pd.Timestamp(datetime.now() - timedelta(days=2)) # recent
]
})

# Mock return value for read_grid_crop_data
mock_read_grid_crop_data.return_value = mock_data

result = get_last_message_date(1, 100, "MSG1", "/fake/path")
self.assertEqual(result, mock_data['message_date'].iloc[2])
# Pre-filter data to simulate getting farm messages
farm_messages = mock_data[mock_data["farm_id"] == 1]

# Call function with the updated parameters
result = get_last_message_date(farm_messages, 100, "MSG1")

@patch("dcas.functions.get_last_message_date")
def test_filter_messages_by_weeks(self, mock_get_last_message_date):
# Expected result: Most recent message date
expected_result = mock_data['message_date'].max()

# Assertions
self.assertEqual(
result,
expected_result,
f"Expected {expected_result}, but got {result}"
)

@patch("dcas.functions.read_grid_crop_data")
def test_filter_messages_by_weeks(self, mock_read_grid_crop_data):
"""Test filtering messages based on the time constraint (weeks)."""
test_weeks = 2 # Remove messages sent within the last 2 weeks
current_date = pd.Timestamp(datetime.now())
current_date = pd.Timestamp(datetime.now()) # Fixed datetime

# Mock input DataFrame (new messages)
df = pd.DataFrame({
'farm_id': [1, 2, 3],
'crop_id': [100, 200, 300],
Expand All @@ -337,16 +367,30 @@ def test_filter_messages_by_weeks(self, mock_get_last_message_date):
'message_5': [None, None, None],
})

# Simulating last message dates for each row
mock_get_last_message_date.side_effect = [
current_date - timedelta(weeks=1), # Should be removed
current_date - timedelta(weeks=3), # Should stay
None # No history, should stay
]
# Mock historical messages (Parquet data)
historical_df = pd.DataFrame({
'farm_id': [1, 2], # Only farms 1 and 2 have historical messages
'crop_id': [100, 200],
'message': ['MSG1', 'MSG2'],
'message_2': [None, None],
'message_3': [None, None],
'message_4': [None, None],
'message_5': [None, None],
'message_date': [
current_date - timedelta(weeks=1), # Recent
current_date - timedelta(weeks=3)], # Older
})

# Mock `read_grid_crop_data` to return the historical messages
mock_read_grid_crop_data.return_value = historical_df

# Run function
filtered_df = filter_messages_by_weeks(df, "/fake/path", test_weeks)

# Assert that the correct messages were removed
self.assertIsNone(filtered_df.loc[0, 'message']) # Removed
self.assertEqual(filtered_df.loc[1, 'message'], 'MSG2') # Kept
self.assertEqual(filtered_df.loc[2, 'message'], 'MSG3') # Kept
# Assertions
self.assertIsNone(filtered_df.loc[0, 'message'])
self.assertEqual(filtered_df.loc[1, 'message'], 'MSG2')
self.assertEqual(filtered_df.loc[2, 'message'], 'MSG3')

# Ensure `read_grid_crop_data` was called once
mock_read_grid_crop_data.assert_called_once_with("/fake/path", [], [])
Loading

0 comments on commit 9d2f740

Please sign in to comment.