Skip to content

Commit

Permalink
Merge branch 'main' into feat-message-management
Browse files Browse the repository at this point in the history
  • Loading branch information
osundwajeff committed Feb 10, 2025
2 parents 8bf0623 + dad4f7e commit 1d6760e
Show file tree
Hide file tree
Showing 24 changed files with 966 additions and 190 deletions.
2 changes: 1 addition & 1 deletion django_project/_version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.13
0.0.14
89 changes: 88 additions & 1 deletion django_project/core/tests/test_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

from core.utils.date import (
find_max_min_epoch_dates,
split_epochs_by_year
split_epochs_by_year,
split_epochs_by_year_month
)


Expand Down Expand Up @@ -178,3 +179,89 @@ def test_same_start_and_end(self):
self.assertEqual(
split_epochs_by_year(int(start_epoch), int(start_epoch)), expected
)


class TestSplitEpochsByYearMonth(TestCase):
"""Test method split_epochs_by_year_month."""

def test_same_month(self):
"""Test same month."""
start_epoch = datetime(2023, 5, 10, tzinfo=timezone.utc).timestamp()
end_epoch = datetime(2023, 5, 25, tzinfo=timezone.utc).timestamp()
expected = [(2023, 5, int(start_epoch), int(end_epoch))]
self.assertEqual(
split_epochs_by_year_month(int(start_epoch), int(end_epoch)),
expected
)

def test_crossing_two_months(self):
"""Test crossing two months."""
start_epoch = datetime(2023, 11, 20, tzinfo=timezone.utc).timestamp()
end_epoch = datetime(2023, 12, 10, tzinfo=timezone.utc).timestamp()
expected = [
(2023, 11, int(start_epoch),
int(datetime(2023, 11, 30, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2023, 12,
int(datetime(2023, 12, 1, tzinfo=timezone.utc).timestamp()),
int(end_epoch))
]
self.assertEqual(
split_epochs_by_year_month(int(start_epoch), int(end_epoch)),
expected
)

def test_crossing_year_boundary(self):
"""Test crossing year."""
start_epoch = datetime(2023, 12, 20, tzinfo=timezone.utc).timestamp()
end_epoch = datetime(2024, 1, 10, tzinfo=timezone.utc).timestamp()
expected = [
(2023, 12, int(start_epoch),
int(datetime(2023, 12, 31, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2024, 1,
int(datetime(2024, 1, 1, tzinfo=timezone.utc).timestamp()),
int(end_epoch))
]
self.assertEqual(
split_epochs_by_year_month(int(start_epoch), int(end_epoch)),
expected
)

def test_multiple_years_and_months(self):
"""Test multiple years and months."""
start_epoch = datetime(2022, 10, 15, tzinfo=timezone.utc).timestamp()
end_epoch = datetime(2023, 2, 20, tzinfo=timezone.utc).timestamp()
expected = [
(2022, 10, int(start_epoch),
int(datetime(2022, 10, 31, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2022, 11,
int(datetime(2022, 11, 1, tzinfo=timezone.utc).timestamp()),
int(datetime(2022, 11, 30, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2022, 12,
int(datetime(2022, 12, 1, tzinfo=timezone.utc).timestamp()),
int(datetime(2022, 12, 31, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2023, 1,
int(datetime(2023, 1, 1, tzinfo=timezone.utc).timestamp()),
int(datetime(2023, 1, 31, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2023, 2,
int(datetime(2023, 2, 1, tzinfo=timezone.utc).timestamp()),
int(end_epoch))
]
self.assertEqual(
split_epochs_by_year_month(int(start_epoch), int(end_epoch)),
expected
)

def test_same_start_and_end(self):
"""Test same input."""
start_epoch = datetime(2023, 7, 15, tzinfo=timezone.utc).timestamp()
expected = [(2023, 7, int(start_epoch), int(start_epoch))]
self.assertEqual(
split_epochs_by_year_month(int(start_epoch), int(start_epoch)),
expected
)
49 changes: 49 additions & 0 deletions django_project/core/utils/date.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,52 @@ def split_epochs_by_year(start_epoch, end_epoch):
current_year += 1

return results


def split_epochs_by_year_month(start_epoch, end_epoch):
"""Split datetime that is in different year and month.
:param start_epoch: Start date time in epoch
:type start_epoch: int
:param end_epoch: End date time in epoch
:type end_epoch: int
:return: List of (year, start_epoch, end_epoch)
:rtype: list
"""
results = []
start_dt = datetime.fromtimestamp(start_epoch, tz=timezone.utc)
end_dt = datetime.fromtimestamp(end_epoch, tz=timezone.utc)

current_year, current_month = start_dt.year, start_dt.month
while (current_year, current_month) <= (end_dt.year, end_dt.month):
month_start = datetime(
current_year, current_month, 1, tzinfo=timezone.utc
).timestamp()

if current_month == 12:
# Last second of the month
month_end = datetime(
current_year + 1, 1, 1,
tzinfo=timezone.utc
).timestamp() - 1
else:
month_end = datetime(
current_year, current_month + 1, 1,
tzinfo=timezone.utc
).timestamp() - 1

start = max(start_epoch, month_start)
end = min(end_epoch, month_end)

results.append(
(current_year, current_month, int(start), int(end))
)

# Move to next month
if current_month == 12:
current_year += 1
current_month = 1
else:
current_month += 1

return results
1 change: 1 addition & 0 deletions django_project/gap/admin/preferences.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class PreferencesAdmin(admin.ModelAdmin):
'dask_threads_num_api',
'api_log_batch_size',
'api_use_x_accel_redirect',
'api_use_parquet',
'user_file_uploader_config'
)
}
Expand Down
151 changes: 119 additions & 32 deletions django_project/gap/ingestor/farm_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def get_farm_id_key(row):
class DCASFarmRegistryIngestor(BaseIngestor):
"""Ingestor for DCAS Farmer Registry data."""

BATCH_SIZE = 100000

def __init__(self, session: IngestorSession, working_dir='/tmp'):
"""Initialize the ingestor with session and working directory.
Expand All @@ -114,6 +116,16 @@ def __init__(self, session: IngestorSession, working_dir='/tmp'):
# Placeholder for the group created during this session
self.group = None

self.farm_list = []
self.registry_list = []
# Initialize lookup dictionaries
self.crop_lookup = {
c.name.lower(): c for c in Crop.objects.all()
}
self.stage_lookup = {
s.name.lower(): s for s in CropStageType.objects.all()
}

def _extract_zip_file(self):
"""Extract the ZIP file to a temporary directory."""
dir_path = os.path.join(self.working_dir, str(uuid.uuid4()))
Expand All @@ -134,7 +146,10 @@ def _extract_zip_file(self):

def _create_registry_group(self):
"""Create a new FarmRegistryGroup."""
group_name = "farm_registry_" + \
datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')
self.group = self.group_model.objects.create(
name=group_name,
date_time=datetime.now(timezone.utc),
is_latest=True
)
Expand All @@ -153,52 +168,111 @@ def _parse_planting_date(self, date_str):
logger.error(f"Invalid date format: {date_str}")
return None # Return None for invalid dates

def _bulk_insert_farms_and_registries(self):
"""Bulk insert Farms first, then FarmRegistry."""
if not self.farm_list:
return

try:
with transaction.atomic(): # Ensure database consistency
# Step 1: Bulk insert farms and retrieve saved instances
Farm.objects.bulk_create(
[Farm(**data) for data in self.farm_list],
ignore_conflicts=True
)

# Step 2: Fetch inserted farms with primary keys
farm_map = {
farm.unique_id: farm
for farm in Farm.objects.filter(
unique_id__in=[
data["unique_id"] for data in self.farm_list
]
)
}

# Step 3: Prepare FarmRegistry objects using mapped farms
registries = []
for data in self.registry_list:
farm_instance = farm_map.get(data["farm_unique_id"])
if not farm_instance:
continue # Skip if farm does not exist

registries.append(FarmRegistry(
group=data["group"],
farm=farm_instance,
crop=data["crop"],
crop_stage_type=data["crop_stage_type"],
planting_date=data["planting_date"],
))

# Step 4: Bulk insert FarmRegistry only if valid records exist
if registries:
FarmRegistry.objects.bulk_create(
registries, ignore_conflicts=True
)

# Clear batch lists
self.farm_list.clear()
self.registry_list.clear()

except Exception as e:
logger.error(f"Bulk insert failed due to {e}")
self.session.status = IngestorSessionStatus.FAILED
self.session.notes = str(e)
self.session.save()

def _process_row(self, row):
"""Process a single row from the input file."""
"""Process a single row from the CSV file."""
try:
# Parse latitude and longitude to create a geometry point
latitude = float(row[Keys.FINAL_LATITUDE])
longitude = float(row[Keys.FINAL_LONGITUDE])
point = Point(x=longitude, y=latitude, srid=4326)

# get crop and stage type
crop_key = Keys.get_crop_key(row)
crop_with_stage = row[crop_key].lower().split('_')
crop, _ = Crop.objects.get_or_create(
name__iexact=crop_with_stage[0],
defaults={
'name': crop_with_stage[0].title()
}
)
stage_type = CropStageType.objects.get(
Q(name__iexact=crop_with_stage[1]) |
Q(alias__iexact=crop_with_stage[1])
)
crop_name = row[crop_key].lower().split('_')[0]

# Parse planting date dynamically
planting_date_key = Keys.get_planting_date_key(row)
planting_date = self._parse_planting_date(row[planting_date_key])
crop = self.crop_lookup.get(crop_name)
if not crop:
crop, _ = Crop.objects.get_or_create(name__iexact=crop_name)
self.crop_lookup[crop_name] = crop

# Get or create the Farm instance
farmer_id_key = Keys.get_farm_id_key(row)
farm, _ = Farm.objects.get_or_create(
unique_id=row[farmer_id_key].strip(),
defaults={
'geometry': point,
'crop': crop
}
stage_type = self.stage_lookup.get(
row[crop_key].lower().split('_')[1]
)
if not stage_type:
stage_type = CropStageType.objects.filter(
Q(name__iexact=row[crop_key]) | Q(
alias__iexact=row[crop_key])
).first()
self.stage_lookup[row[crop_key]] = stage_type

# Create the FarmRegistry entry
FarmRegistry.objects.update_or_create(
group=self.group,
farm=farm,
crop=crop,
crop_stage_type=stage_type,
planting_date=planting_date,
farmer_id_key = Keys.get_farm_id_key(row)
# Store farm data as a dictionary
farm_data = {
"unique_id": row[farmer_id_key].strip(),
"geometry": point,
"crop": crop
}
self.farm_list.append(farm_data)

planting_date = self._parse_planting_date(
row[Keys.get_planting_date_key(row)]
)

registry_data = {
"group": self.group,
"farm_unique_id": row[farmer_id_key].strip(),
"crop": crop,
"crop_stage_type": stage_type,
"planting_date": planting_date,
}
self.registry_list.append(registry_data)

# Batch process every BATCH_SIZE records
if len(self.farm_list) >= self.BATCH_SIZE:
self._bulk_insert_farms_and_registries()

except Exception as e:
logger.error(f"Error processing row: {row} - {e}")

Expand Down Expand Up @@ -254,5 +328,18 @@ def run(self):
dir_path = self._extract_zip_file()
try:
self._run(dir_path)

# Final batch insert (ensures all remaining farms are inserted)
if self.farm_list:
self._bulk_insert_farms_and_registries()

# If no errors occurred in `_run()`, mark as SUCCESS
if self.session.status != IngestorSessionStatus.FAILED:
self.session.status = IngestorSessionStatus.SUCCESS
except Exception as e:
self.session.status = IngestorSessionStatus.FAILED
self.session.notes = str(e)
finally:
shutil.rmtree(dir_path)
self.session.end_at = datetime.now(timezone.utc)
self.session.save()
18 changes: 18 additions & 0 deletions django_project/gap/migrations/0047_preferences_api_use_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.7 on 2025-02-04 06:05

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('gap', '0046_alter_dataset_store_type_alter_datasourcefile_format'),
]

operations = [
migrations.AddField(
model_name='preferences',
name='api_use_parquet',
field=models.BooleanField(default=False, help_text='When set to True, API will use parquet reader instead of EAV.'),
),
]
Loading

0 comments on commit 1d6760e

Please sign in to comment.