Skip to content

Commit

Permalink
feat: Ingest DCAS Farmer Registry data with batch processing (#422)
Browse files Browse the repository at this point in the history
* feat: Ingest DCAS Farmer Registry data with batch processing

* fix flake8

* Refactor farm registry bulk insert and error handling

* Add test for farm registry ingestor
  • Loading branch information
osundwajeff authored Feb 5, 2025
1 parent 8e7ab25 commit dad4f7e
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 48 deletions.
151 changes: 119 additions & 32 deletions django_project/gap/ingestor/farm_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def get_farm_id_key(row):
class DCASFarmRegistryIngestor(BaseIngestor):
"""Ingestor for DCAS Farmer Registry data."""

BATCH_SIZE = 100000

def __init__(self, session: IngestorSession, working_dir='/tmp'):
"""Initialize the ingestor with session and working directory.
Expand All @@ -114,6 +116,16 @@ def __init__(self, session: IngestorSession, working_dir='/tmp'):
# Placeholder for the group created during this session
self.group = None

self.farm_list = []
self.registry_list = []
# Initialize lookup dictionaries
self.crop_lookup = {
c.name.lower(): c for c in Crop.objects.all()
}
self.stage_lookup = {
s.name.lower(): s for s in CropStageType.objects.all()
}

def _extract_zip_file(self):
"""Extract the ZIP file to a temporary directory."""
dir_path = os.path.join(self.working_dir, str(uuid.uuid4()))
Expand All @@ -134,7 +146,10 @@ def _extract_zip_file(self):

def _create_registry_group(self):
"""Create a new FarmRegistryGroup."""
group_name = "farm_registry_" + \
datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')
self.group = self.group_model.objects.create(
name=group_name,
date_time=datetime.now(timezone.utc),
is_latest=True
)
Expand All @@ -153,52 +168,111 @@ def _parse_planting_date(self, date_str):
logger.error(f"Invalid date format: {date_str}")
return None # Return None for invalid dates

def _bulk_insert_farms_and_registries(self):
"""Bulk insert Farms first, then FarmRegistry."""
if not self.farm_list:
return

try:
with transaction.atomic(): # Ensure database consistency
# Step 1: Bulk insert farms and retrieve saved instances
Farm.objects.bulk_create(
[Farm(**data) for data in self.farm_list],
ignore_conflicts=True
)

# Step 2: Fetch inserted farms with primary keys
farm_map = {
farm.unique_id: farm
for farm in Farm.objects.filter(
unique_id__in=[
data["unique_id"] for data in self.farm_list
]
)
}

# Step 3: Prepare FarmRegistry objects using mapped farms
registries = []
for data in self.registry_list:
farm_instance = farm_map.get(data["farm_unique_id"])
if not farm_instance:
continue # Skip if farm does not exist

registries.append(FarmRegistry(
group=data["group"],
farm=farm_instance,
crop=data["crop"],
crop_stage_type=data["crop_stage_type"],
planting_date=data["planting_date"],
))

# Step 4: Bulk insert FarmRegistry only if valid records exist
if registries:
FarmRegistry.objects.bulk_create(
registries, ignore_conflicts=True
)

# Clear batch lists
self.farm_list.clear()
self.registry_list.clear()

except Exception as e:
logger.error(f"Bulk insert failed due to {e}")
self.session.status = IngestorSessionStatus.FAILED
self.session.notes = str(e)
self.session.save()

def _process_row(self, row):
"""Process a single row from the input file."""
"""Process a single row from the CSV file."""
try:
# Parse latitude and longitude to create a geometry point
latitude = float(row[Keys.FINAL_LATITUDE])
longitude = float(row[Keys.FINAL_LONGITUDE])
point = Point(x=longitude, y=latitude, srid=4326)

# get crop and stage type
crop_key = Keys.get_crop_key(row)
crop_with_stage = row[crop_key].lower().split('_')
crop, _ = Crop.objects.get_or_create(
name__iexact=crop_with_stage[0],
defaults={
'name': crop_with_stage[0].title()
}
)
stage_type = CropStageType.objects.get(
Q(name__iexact=crop_with_stage[1]) |
Q(alias__iexact=crop_with_stage[1])
)
crop_name = row[crop_key].lower().split('_')[0]

# Parse planting date dynamically
planting_date_key = Keys.get_planting_date_key(row)
planting_date = self._parse_planting_date(row[planting_date_key])
crop = self.crop_lookup.get(crop_name)
if not crop:
crop, _ = Crop.objects.get_or_create(name__iexact=crop_name)
self.crop_lookup[crop_name] = crop

# Get or create the Farm instance
farmer_id_key = Keys.get_farm_id_key(row)
farm, _ = Farm.objects.get_or_create(
unique_id=row[farmer_id_key].strip(),
defaults={
'geometry': point,
'crop': crop
}
stage_type = self.stage_lookup.get(
row[crop_key].lower().split('_')[1]
)
if not stage_type:
stage_type = CropStageType.objects.filter(
Q(name__iexact=row[crop_key]) | Q(
alias__iexact=row[crop_key])
).first()
self.stage_lookup[row[crop_key]] = stage_type

# Create the FarmRegistry entry
FarmRegistry.objects.update_or_create(
group=self.group,
farm=farm,
crop=crop,
crop_stage_type=stage_type,
planting_date=planting_date,
farmer_id_key = Keys.get_farm_id_key(row)
# Store farm data as a dictionary
farm_data = {
"unique_id": row[farmer_id_key].strip(),
"geometry": point,
"crop": crop
}
self.farm_list.append(farm_data)

planting_date = self._parse_planting_date(
row[Keys.get_planting_date_key(row)]
)

registry_data = {
"group": self.group,
"farm_unique_id": row[farmer_id_key].strip(),
"crop": crop,
"crop_stage_type": stage_type,
"planting_date": planting_date,
}
self.registry_list.append(registry_data)

# Batch process every BATCH_SIZE records
if len(self.farm_list) >= self.BATCH_SIZE:
self._bulk_insert_farms_and_registries()

except Exception as e:
logger.error(f"Error processing row: {row} - {e}")

Expand Down Expand Up @@ -254,5 +328,18 @@ def run(self):
dir_path = self._extract_zip_file()
try:
self._run(dir_path)

# Final batch insert (ensures all remaining farms are inserted)
if self.farm_list:
self._bulk_insert_farms_and_registries()

# If no errors occurred in `_run()`, mark as SUCCESS
if self.session.status != IngestorSessionStatus.FAILED:
self.session.status = IngestorSessionStatus.SUCCESS
except Exception as e:
self.session.status = IngestorSessionStatus.FAILED
self.session.notes = str(e)
finally:
shutil.rmtree(dir_path)
self.session.end_at = datetime.now(timezone.utc)
self.session.save()
Binary file not shown.
71 changes: 55 additions & 16 deletions django_project/gap/tests/ingestor/test_farm_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
import os
import logging
import unittest
from unittest.mock import patch
from datetime import date
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase
from gap.models import (
Farm, Crop, FarmRegistry, FarmRegistryGroup,
Farm, FarmRegistry, FarmRegistryGroup,
IngestorSession, IngestorSessionStatus
)
from gap.ingestor.farm_registry import (
Expand Down Expand Up @@ -46,26 +47,26 @@ def setUp(self):
'farm_registry',
'test_farm_registry.zip' # Pre-existing ZIP file
)

def test_successful_ingestion(self):
"""Test successful ingestion of farmer registry data."""
with open(self.test_zip_path, 'rb') as _file:
test_file = SimpleUploadedFile(_file.name, _file.read())
self.test_file = SimpleUploadedFile(_file.name, _file.read())

session = IngestorSession.objects.create(
file=test_file,
self.session = IngestorSession.objects.create(
file=self.test_file,
ingestor_type='Farm Registry',
trigger_task=False
)

ingestor = DCASFarmRegistryIngestor(session)
ingestor.run()
self.ingestor = DCASFarmRegistryIngestor(self.session)

def test_successful_ingestion(self):
"""Test successful ingestion of farmer registry data."""
self.ingestor.run()

# Verify session status
session.refresh_from_db()
print(session.status, session.notes)
self.session.refresh_from_db()
print(self.session.status, self.session.notes)
self.assertEqual(
session.status,
self.session.status,
IngestorSessionStatus.SUCCESS,
"Session status should be SUCCESS."
)
Expand All @@ -84,9 +85,47 @@ def test_successful_ingestion(self):
self.assertEqual(farm.geometry.x, 36.8219)
self.assertEqual(farm.geometry.y, -1.2921)

# Verify Crop details
crop = Crop.objects.get(name='Maize')
self.assertIsNotNone(crop)
def test_bulk_insert_with_empty_farm_list(self):
"""Test `_bulk_insert_farms_and_registries()`."""
# Ensure farm_list is empty
self.ingestor.farm_list = []

# Patch bulk_create to ensure it does NOT get called
with patch("gap.models.Farm.objects.bulk_create") as mock_bulk_create:
self.ingestor._bulk_insert_farms_and_registries()

# Assert bulk_create was NEVER called
mock_bulk_create.assert_not_called()

@patch(
"gap.ingestor.farm_registry.DCASFarmRegistryIngestor._run",
side_effect=Exception("Fatal error")
)
def test_run_failure_sets_failed_status(self, mock_run):
"""Ensure session status is marked as FAILED."""
self.ingestor.run()

# Refresh session and check status
self.session.refresh_from_db()
self.assertEqual(self.session.status, IngestorSessionStatus.FAILED)
self.assertIn("Fatal error", self.session.notes)

def test_stage_lookup_population(self):
"""Ensure `_process_row` correctly updates `stage_lookup`."""
# Create a CropStageType instance
row = {
'CropName': 'Maize_Mid',
'FarmerId': 'F100',
'FinalLatitude': '36.8219',
'FinalLongitude': '-1.2921',
'PlantingDate': '2024-01-01'
}

self.ingestor._process_row(row)

# Ensure that the crop stage type is added to stage_lookup
crop_stage_key = "mid"
self.assertIn(crop_stage_key, self.ingestor.stage_lookup)


class TestKeysStaticMethods(unittest.TestCase):
Expand All @@ -97,7 +136,7 @@ def test_get_crop_key(self):
self.assertEqual(
Keys.get_crop_key({'CropName': 'Maize'}), 'CropName')
self.assertEqual(
Keys.get_crop_key({'crop': 'Wheat'}), 'crop')
Keys.get_crop_key({'crop': 'Cassava'}), 'crop')
with self.assertRaises(KeyError):
Keys.get_crop_key({'wrong_key': 'Soybean'})

Expand Down

0 comments on commit dad4f7e

Please sign in to comment.