Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ingest DCAS Farmer Registry data with batch processing #422

Merged
merged 4 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 119 additions & 32 deletions django_project/gap/ingestor/farm_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@
class DCASFarmRegistryIngestor(BaseIngestor):
"""Ingestor for DCAS Farmer Registry data."""

BATCH_SIZE = 100000

def __init__(self, session: IngestorSession, working_dir='/tmp'):
"""Initialize the ingestor with session and working directory.

Expand All @@ -114,6 +116,16 @@
# Placeholder for the group created during this session
self.group = None

self.farm_list = []
self.registry_list = []
# Initialize lookup dictionaries
self.crop_lookup = {
c.name.lower(): c for c in Crop.objects.all()
}
self.stage_lookup = {
s.name.lower(): s for s in CropStageType.objects.all()
}

def _extract_zip_file(self):
"""Extract the ZIP file to a temporary directory."""
dir_path = os.path.join(self.working_dir, str(uuid.uuid4()))
Expand All @@ -134,7 +146,10 @@

def _create_registry_group(self):
"""Create a new FarmRegistryGroup."""
group_name = "farm_registry_" + \
datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')
self.group = self.group_model.objects.create(
name=group_name,
date_time=datetime.now(timezone.utc),
is_latest=True
)
Expand All @@ -153,52 +168,111 @@
logger.error(f"Invalid date format: {date_str}")
return None # Return None for invalid dates

def _bulk_insert_farms_and_registries(self):
"""Bulk insert Farms first, then FarmRegistry."""
if not self.farm_list:
return

try:
with transaction.atomic(): # Ensure database consistency
# Step 1: Bulk insert farms and retrieve saved instances
Farm.objects.bulk_create(
[Farm(**data) for data in self.farm_list],
ignore_conflicts=True
)

# Step 2: Fetch inserted farms with primary keys
farm_map = {
farm.unique_id: farm
for farm in Farm.objects.filter(
unique_id__in=[
data["unique_id"] for data in self.farm_list
]
)
}

# Step 3: Prepare FarmRegistry objects using mapped farms
registries = []
for data in self.registry_list:
farm_instance = farm_map.get(data["farm_unique_id"])
if not farm_instance:
continue # Skip if farm does not exist

Check warning on line 199 in django_project/gap/ingestor/farm_registry.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/ingestor/farm_registry.py#L199

Added line #L199 was not covered by tests

registries.append(FarmRegistry(
group=data["group"],
farm=farm_instance,
crop=data["crop"],
crop_stage_type=data["crop_stage_type"],
planting_date=data["planting_date"],
))

# Step 4: Bulk insert FarmRegistry only if valid records exist
if registries:
FarmRegistry.objects.bulk_create(
registries, ignore_conflicts=True
)

# Clear batch lists
self.farm_list.clear()
self.registry_list.clear()

except Exception as e:
logger.error(f"Bulk insert failed due to {e}")
self.session.status = IngestorSessionStatus.FAILED
self.session.notes = str(e)
self.session.save()

Check warning on line 223 in django_project/gap/ingestor/farm_registry.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/ingestor/farm_registry.py#L219-L223

Added lines #L219 - L223 were not covered by tests

def _process_row(self, row):
"""Process a single row from the input file."""
"""Process a single row from the CSV file."""
try:
# Parse latitude and longitude to create a geometry point
latitude = float(row[Keys.FINAL_LATITUDE])
longitude = float(row[Keys.FINAL_LONGITUDE])
point = Point(x=longitude, y=latitude, srid=4326)

# get crop and stage type
crop_key = Keys.get_crop_key(row)
crop_with_stage = row[crop_key].lower().split('_')
crop, _ = Crop.objects.get_or_create(
name__iexact=crop_with_stage[0],
defaults={
'name': crop_with_stage[0].title()
}
)
stage_type = CropStageType.objects.get(
Q(name__iexact=crop_with_stage[1]) |
Q(alias__iexact=crop_with_stage[1])
)
crop_name = row[crop_key].lower().split('_')[0]

# Parse planting date dynamically
planting_date_key = Keys.get_planting_date_key(row)
planting_date = self._parse_planting_date(row[planting_date_key])
crop = self.crop_lookup.get(crop_name)
if not crop:
crop, _ = Crop.objects.get_or_create(name__iexact=crop_name)
self.crop_lookup[crop_name] = crop

# Get or create the Farm instance
farmer_id_key = Keys.get_farm_id_key(row)
farm, _ = Farm.objects.get_or_create(
unique_id=row[farmer_id_key].strip(),
defaults={
'geometry': point,
'crop': crop
}
stage_type = self.stage_lookup.get(
row[crop_key].lower().split('_')[1]
)
if not stage_type:
stage_type = CropStageType.objects.filter(

Check warning on line 244 in django_project/gap/ingestor/farm_registry.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/ingestor/farm_registry.py#L244

Added line #L244 was not covered by tests
Q(name__iexact=row[crop_key]) | Q(
alias__iexact=row[crop_key])
).first()
self.stage_lookup[row[crop_key]] = stage_type

Check warning on line 248 in django_project/gap/ingestor/farm_registry.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/ingestor/farm_registry.py#L248

Added line #L248 was not covered by tests

# Create the FarmRegistry entry
FarmRegistry.objects.update_or_create(
group=self.group,
farm=farm,
crop=crop,
crop_stage_type=stage_type,
planting_date=planting_date,
farmer_id_key = Keys.get_farm_id_key(row)
# Store farm data as a dictionary
farm_data = {
"unique_id": row[farmer_id_key].strip(),
"geometry": point,
"crop": crop
}
self.farm_list.append(farm_data)

planting_date = self._parse_planting_date(
row[Keys.get_planting_date_key(row)]
)

registry_data = {
"group": self.group,
"farm_unique_id": row[farmer_id_key].strip(),
"crop": crop,
"crop_stage_type": stage_type,
"planting_date": planting_date,
}
self.registry_list.append(registry_data)

# Batch process every BATCH_SIZE records
if len(self.farm_list) >= self.BATCH_SIZE:
self._bulk_insert_farms_and_registries()

Check warning on line 274 in django_project/gap/ingestor/farm_registry.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/ingestor/farm_registry.py#L274

Added line #L274 was not covered by tests

except Exception as e:
logger.error(f"Error processing row: {row} - {e}")

Expand Down Expand Up @@ -254,5 +328,18 @@
dir_path = self._extract_zip_file()
try:
self._run(dir_path)

# Final batch insert (ensures all remaining farms are inserted)
if self.farm_list:
self._bulk_insert_farms_and_registries()

# If no errors occurred in `_run()`, mark as SUCCESS
if self.session.status != IngestorSessionStatus.FAILED:
self.session.status = IngestorSessionStatus.SUCCESS
except Exception as e:
self.session.status = IngestorSessionStatus.FAILED
self.session.notes = str(e)
finally:
shutil.rmtree(dir_path)
self.session.end_at = datetime.now(timezone.utc)
self.session.save()
Binary file not shown.
71 changes: 55 additions & 16 deletions django_project/gap/tests/ingestor/test_farm_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
import os
import logging
import unittest
from unittest.mock import patch
from datetime import date
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase
from gap.models import (
Farm, Crop, FarmRegistry, FarmRegistryGroup,
Farm, FarmRegistry, FarmRegistryGroup,
IngestorSession, IngestorSessionStatus
)
from gap.ingestor.farm_registry import (
Expand Down Expand Up @@ -46,26 +47,26 @@ def setUp(self):
'farm_registry',
'test_farm_registry.zip' # Pre-existing ZIP file
)

def test_successful_ingestion(self):
"""Test successful ingestion of farmer registry data."""
with open(self.test_zip_path, 'rb') as _file:
test_file = SimpleUploadedFile(_file.name, _file.read())
self.test_file = SimpleUploadedFile(_file.name, _file.read())

session = IngestorSession.objects.create(
file=test_file,
self.session = IngestorSession.objects.create(
file=self.test_file,
ingestor_type='Farm Registry',
trigger_task=False
)

ingestor = DCASFarmRegistryIngestor(session)
ingestor.run()
self.ingestor = DCASFarmRegistryIngestor(self.session)

def test_successful_ingestion(self):
"""Test successful ingestion of farmer registry data."""
self.ingestor.run()

# Verify session status
session.refresh_from_db()
print(session.status, session.notes)
self.session.refresh_from_db()
print(self.session.status, self.session.notes)
self.assertEqual(
session.status,
self.session.status,
IngestorSessionStatus.SUCCESS,
"Session status should be SUCCESS."
)
Expand All @@ -84,9 +85,47 @@ def test_successful_ingestion(self):
self.assertEqual(farm.geometry.x, 36.8219)
self.assertEqual(farm.geometry.y, -1.2921)

# Verify Crop details
crop = Crop.objects.get(name='Maize')
self.assertIsNotNone(crop)
def test_bulk_insert_with_empty_farm_list(self):
"""Test `_bulk_insert_farms_and_registries()`."""
# Ensure farm_list is empty
self.ingestor.farm_list = []

# Patch bulk_create to ensure it does NOT get called
with patch("gap.models.Farm.objects.bulk_create") as mock_bulk_create:
self.ingestor._bulk_insert_farms_and_registries()

# Assert bulk_create was NEVER called
mock_bulk_create.assert_not_called()

@patch(
"gap.ingestor.farm_registry.DCASFarmRegistryIngestor._run",
side_effect=Exception("Fatal error")
)
def test_run_failure_sets_failed_status(self, mock_run):
"""Ensure session status is marked as FAILED."""
self.ingestor.run()

# Refresh session and check status
self.session.refresh_from_db()
self.assertEqual(self.session.status, IngestorSessionStatus.FAILED)
self.assertIn("Fatal error", self.session.notes)

def test_stage_lookup_population(self):
"""Ensure `_process_row` correctly updates `stage_lookup`."""
# Create a CropStageType instance
row = {
'CropName': 'Maize_Mid',
'FarmerId': 'F100',
'FinalLatitude': '36.8219',
'FinalLongitude': '-1.2921',
'PlantingDate': '2024-01-01'
}

self.ingestor._process_row(row)

# Ensure that the crop stage type is added to stage_lookup
crop_stage_key = "mid"
self.assertIn(crop_stage_key, self.ingestor.stage_lookup)


class TestKeysStaticMethods(unittest.TestCase):
Expand All @@ -97,7 +136,7 @@ def test_get_crop_key(self):
self.assertEqual(
Keys.get_crop_key({'CropName': 'Maize'}), 'CropName')
self.assertEqual(
Keys.get_crop_key({'crop': 'Wheat'}), 'crop')
Keys.get_crop_key({'crop': 'Cassava'}), 'crop')
with self.assertRaises(KeyError):
Keys.get_crop_key({'wrong_key': 'Soybean'})

Expand Down
Loading