Skip to content
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
00e331a
feat: added multipart initate upload route
Oct 7, 2025
855f2ec
feat: added all multipart upload functions
Oct 7, 2025
d0f2f18
Merge branch 'develop' into feat/resumable-uploads
Oct 7, 2025
480636f
Merge branch 'develop' into feat/resumable-uploads
Oct 10, 2025
bf0bb88
feat(UploadBox): Swapped upload from single to multipart upload using…
Oct 12, 2025
519e0e2
feat(frontend): add @uppy/drag-drop and @uppy/progress-bar dependenci…
Oct 13, 2025
d151423
feat(frontend): feat: integrate Uppy for image uploads and enhance fi…
Oct 13, 2025
5a4e5c1
feat(image-upload): Refactor S3 integration to use boto3 instead of M…
Oct 16, 2025
b4f22b3
feat(image-upload-workflow): Refactor code structure for improved rea…
Oct 20, 2025
312279d
feat(image-upload-workflow): Implement drone image processing workflo…
Oct 21, 2025
ba3ac56
feat(image-upload-workflow): Refactor image upload components and enh…
Oct 26, 2025
61b8d01
feat(image-upload-workflow): Refactor image upload components and enh…
Oct 26, 2025
00724e5
fixed with minio
Oct 27, 2025
66dbc58
feat(image-upload-workflow): Add project_id and filename to the uploa…
Oct 27, 2025
59ea669
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 27, 2025
0621bec
feat(image-upload-workflow): feat(migrations): Enhance project_images…
Oct 27, 2025
0ab2c78
Merge branch 'feat/resumable-uploads' of https://github.com/hotosm/dr…
Oct 27, 2025
17099fa
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 27, 2025
e6046b8
fix(image-upload-workflow): feat(step-switcher): Implement generic St…
Oct 30, 2025
3103d8d
feat(image-upload-workflow): feat(db-models): Add DbProjectImage mode…
Oct 31, 2025
82940c8
feat(image-upload-workflow): Enhance image processing with new databa…
Oct 31, 2025
6c26e6c
fix(image-upload-workflow): refactor(image-upload-workflow): Clean up…
Oct 31, 2025
c226707
refactor(image-upload-workflow): Replace deprecated get_presigned_url…
Nov 6, 2025
dc926ea
fix(image-upload-workflow): Implement upsert functionality for image …
Nov 10, 2025
3ba4818
Added multipart imagery upload capability (#649)
FeezyHendrix Nov 10, 2025
e32c2de
feat(image-upload-worklow): Implement image classification functionality
Nov 12, 2025
dffa82f
feat(image-upload-workflow): Add sharpness_score field to project_ima…
Nov 12, 2025
28b5725
feat(image-upload-workflow): Enhance image classification with batch …
Nov 17, 2025
f565c95
feat(image-upload-workflow): Update image status references from 'upl…
Nov 24, 2025
779d207
feat(image-upload-workflow): Add logging for classification requests …
Nov 26, 2025
8e29f4f
feat(image-upload-workflow): feat(image-processing-workflow): Impleme…
Nov 26, 2025
cc9b3a4
fix(merge): Refactor Map Context and Component Integration
Nov 26, 2025
a34e7fa
fix(image-upload-workflow): Merged conflicts
Nov 26, 2025
84256a6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 26, 2025
33849fb
feat(image-upload-workflow): Update migration and enhance image class…
Dec 1, 2025
3512444
feat(image-upload-workflow): Improve exception handling in _convert_e…
Dec 1, 2025
9aa5446
refactor(image-upload-workflow): Improve code formatting and readabil…
Dec 1, 2025
27b1e52
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 1, 2025
b849e97
feat(image-upload-workflow): add image detail modal and improve image…
Dec 2, 2025
230c219
fix(image-upload-workflow): enhance EXIF value handling and improve i…
Dec 2, 2025
71deb4f
fix(image-upload-workflow): enhance loading and empty states in image…
Dec 2, 2025
d49bf41
fix(image-upload-workflow): implement duplicate image handling and st…
Dec 2, 2025
8ba45ca
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 2, 2025
6c7d5b2
feat(image-upload-workflow): add thumbnail generation and upload func…
Dec 4, 2025
9c36779
fix(image-upload-workflow): improve code readability with consistent …
Dec 4, 2025
1dc704d
feat(image-upload-workflow): enhance image review process with batch …
FeezyHendrix Dec 8, 2025
e784b0c
feat(image-upload-workflow): Enhance image classification workflow wi…
FeezyHendrix Dec 15, 2025
298ff4a
fix(image-upload-workflow): feat: update dependencies and enhance tas…
FeezyHendrix Dec 15, 2025
a4acea3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ services:
- ${PROJECT_DIR:-.}/src/backend/packages/drone-flightplan/drone_flightplan:/opt/python/lib/python3.11/site-packages/drone_flightplan:ro
- frontend-html:/project/src/backend/frontend_html
env_file: .env
extra_hosts:
- "localhost:host-gateway"
networks:
- dtm-network
restart: unless-stopped
Expand Down Expand Up @@ -130,6 +132,8 @@ services:
depends_on:
- redis
env_file: .env
extra_hosts:
- "localhost:host-gateway"
networks:
- dtm-network
volumes:
Expand Down
215 changes: 213 additions & 2 deletions src/backend/app/arq/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
import asyncio
from typing import Any, Dict
from typing import Any, Dict, Optional
from uuid import UUID

from arq import ArqRedis, create_pool
from arq.connections import RedisSettings, log_redis_info
from fastapi import HTTPException
from loguru import logger as log
from psycopg.rows import dict_row

from app.config import settings
from app.db.database import get_db_connection_pool
from app.models.enums import HTTPStatus
from app.images.image_logic import (
calculate_file_hash,
check_duplicate_image,
create_project_image,
extract_exif_data,
)
from app.images.image_schemas import ProjectImageCreate, ProjectImageOut
from app.models.enums import HTTPStatus, ImageStatus
from app.projects.project_logic import process_all_drone_images, process_drone_images
from app.s3 import async_get_obj_from_bucket
from app.projects.image_classification import ImageClassifier


async def startup(ctx: Dict[Any, Any]) -> None:
Expand Down Expand Up @@ -75,6 +86,204 @@ async def count_project_tasks(ctx: Dict[Any, Any], project_id: str) -> Dict[str,
raise


async def _save_image_record(
db,
project_id: str,
filename: str,
file_key: str,
file_hash: str,
uploaded_by: str,
exif_dict: Optional[dict] = None,
location: Optional[dict] = None,
status: ImageStatus = ImageStatus.STAGED,
batch_id: Optional[str] = None,
) -> ProjectImageOut:
"""Save image record to database.

Args:
db: Database connection
project_id: Project UUID
filename: Original filename
file_key: S3 key
file_hash: MD5 hash
uploaded_by: User ID
exif_dict: EXIF data (optional)
location: GPS location (optional)
status: Image status (STAGED, INVALID_EXIF, etc.)
batch_id: Batch UUID for grouping uploads (optional)

Returns:
ProjectImageOut: Saved image record
"""
image_data = ProjectImageCreate(
project_id=UUID(project_id),
filename=filename,
s3_key=file_key,
hash_md5=file_hash,
location=location,
exif=exif_dict,
uploaded_by=uploaded_by,
status=status,
batch_id=UUID(batch_id) if batch_id else None,
)

image_record = await create_project_image(db, image_data)
await db.commit()

log.info(
f"Saved: {filename} | Status: {status} | "
f"GPS: {location is not None} | EXIF: {exif_dict is not None}"
)

return image_record


async def process_uploaded_image(
ctx: Dict[Any, Any],
project_id: str,
file_key: str,
filename: str,
uploaded_by: str,
batch_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Background task to process uploaded image: extract EXIF, calculate hash, save to DB.

This function ALWAYS saves the image record, even if EXIF extraction fails,
so that all uploaded images can be reviewed during the classification phase.

Args:
ctx: ARQ context
project_id: UUID of the project
file_key: S3 key of the uploaded file
filename: Original filename
uploaded_by: User ID who uploaded

Returns:
dict: Processing result with image_id and status
"""
job_id = ctx.get("job_id", "unknown")
log.info(f"Starting process_uploaded_image (Job ID: {job_id}): {filename}")

try:
# Get database connection from pool
db_pool = ctx.get("db_pool")
if not db_pool:
raise Exception("Database pool not initialized")

async with db_pool.connection() as db:
log.info(f"Downloading file from S3: {file_key}")
file_obj = await async_get_obj_from_bucket(
settings.S3_BUCKET_NAME, file_key
)
file_content = file_obj.read()

log.info(f"Calculating hash for: {filename}")
file_hash = calculate_file_hash(file_content)

# Step 2: Check for duplicates (idempotent behavior)
duplicate_id = await check_duplicate_image(db, UUID(project_id), file_hash)
if duplicate_id:
log.info(f"Duplicate detected: {file_hash} -> {duplicate_id}")
sql = "SELECT * FROM project_images WHERE id = %(id)s"
async with db.cursor(row_factory=dict_row) as cur:
await cur.execute(sql, {"id": str(duplicate_id)})
existing_record = await cur.fetchone()

return {
"image_id": str(duplicate_id),
"status": existing_record["status"],
"has_gps": existing_record["location"] is not None,
"is_duplicate": True,
"message": "Duplicate image (idempotent)",
}

# Step 3: Extract EXIF (try-catch to handle failures gracefully)
exif_dict = None
location = None

try:
log.info(f"Extracting EXIF from: {filename}")
exif_dict, location = extract_exif_data(file_content)

if exif_dict:
log.info(
f" EXIF: {len(exif_dict)} tags | GPS: {location is not None}"
)
log.debug(f"EXIF tags: {list(exif_dict.keys())[:10]}")
else:
log.warning(f"No EXIF data in: {filename}")

except Exception as exif_error:
log.error(f"EXIF extraction failed for {filename}: {exif_error}")

# Step 4: Determine status
status = ImageStatus.STAGED if exif_dict else ImageStatus.INVALID_EXIF

# Step 5: Save image record (ALWAYS save, even if EXIF failed)
image_record = await _save_image_record(
db=db,
project_id=project_id,
filename=filename,
file_key=file_key,
file_hash=file_hash,
uploaded_by=uploaded_by,
exif_dict=exif_dict,
location=location,
status=status,
batch_id=batch_id,
)

log.info(
f"Completed (Job: {job_id}): "
f"ID={image_record.id} | Status={status} | "
f"EXIF={'Yes' if exif_dict else 'No'} | GPS={'Yes' if location else 'No'}"
)

return {
"image_id": str(image_record.id),
"status": image_record.status,
"has_gps": location is not None,
"is_duplicate": False,
}

except Exception as e:
log.error(f"Failed (Job: {job_id}): {str(e)}")
raise


async def classify_image_batch(
ctx: Dict[Any, Any],
project_id: str,
batch_id: str,
) -> Dict:
job_id = ctx.get("job_id", "unknown")
log.info(f"Starting batch classification job {job_id} for batch {batch_id}")

db_pool = ctx.get("db_pool")
if not db_pool:
raise RuntimeError("Database pool not initialized in ARQ context")

try:
async with db_pool.connection() as conn:
result = await ImageClassifier.classify_batch(
conn,
UUID(batch_id),
UUID(project_id),
)

log.info(
f"Batch classification complete: "
f"Total={result['total']}, Assigned={result['assigned']}, "
f"Rejected={result['rejected']}, Unmatched={result['unmatched']}"
)

return result

except Exception as e:
log.error(f"Batch classification failed: {str(e)}")
raise


class WorkerSettings:
"""ARQ worker configuration"""

Expand All @@ -84,6 +293,8 @@ class WorkerSettings:
count_project_tasks,
process_drone_images,
process_all_drone_images,
process_uploaded_image,
classify_image_batch,
]

queue_name = "default_queue"
Expand Down
73 changes: 72 additions & 1 deletion src/backend/app/db/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from sqlalchemy import (
ARRAY,
Boolean,
CHAR,
Column,
DateTime,
Enum,
Expand All @@ -15,8 +16,9 @@
LargeBinary,
SmallInteger,
String,
Text,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import (
declarative_base,
object_session,
Expand All @@ -26,6 +28,7 @@
from app.models.enums import (
FinalOutput,
ImageProcessingStatus,
ImageStatus,
OAMUploadStatus,
ProjectStatus,
ProjectVisibility,
Expand Down Expand Up @@ -232,6 +235,74 @@ def tasks_bad(self):
)


class DbProjectImage(Base):
"""Describes an uploaded image for a project."""

__tablename__ = "project_images"

id = cast(str, Column(UUID(as_uuid=True), primary_key=True))
project_id = cast(
str,
Column(
UUID(as_uuid=True),
ForeignKey("projects.id", ondelete="CASCADE"),
nullable=False,
),
)
task_id = cast(
str,
Column(
UUID(as_uuid=True),
ForeignKey("tasks.id", ondelete="SET NULL"),
nullable=True,
),
)
filename = cast(str, Column(Text, nullable=False))
s3_key = cast(str, Column(Text, nullable=False))
hash_md5 = cast(str, Column(CHAR(32), nullable=False))
batch_id = cast(str, Column(UUID(as_uuid=True), nullable=True))
location = cast(WKBElement, Column(Geometry("POINT", srid=4326), nullable=True))
exif = cast(dict, Column(JSONB, nullable=True))
uploaded_by = cast(
str, Column(String, ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
)
uploaded_at = cast(datetime, Column(DateTime, default=timestamp, nullable=False))
classified_at = cast(datetime, Column(DateTime, nullable=True))
status = cast(
ImageStatus,
Column(Enum(ImageStatus), default=ImageStatus.UPLOADED, nullable=False),
)
rejection_reason = cast(str, Column(Text, nullable=True))
sharpness_score = cast(float, Column(Float, nullable=True))
duplicate_of = cast(
str,
Column(
UUID(as_uuid=True),
ForeignKey("project_images.id", ondelete="SET NULL"),
nullable=True,
),
)

# Relationships
project = relationship(DbProject, backref="images")
task = relationship(DbTask, backref="images")
uploader = relationship(
DbUser, foreign_keys=[uploaded_by], backref="uploaded_images"
)

__table_args__ = (
Index("idx_project_images_project_id", "project_id"),
Index("idx_project_images_task_id", "task_id"),
Index("idx_project_images_status", "status"),
Index("idx_project_images_batch_id", "batch_id"),
Index("idx_project_images_hash_md5", "hash_md5"),
Index("idx_project_images_uploaded_by", "uploaded_by"),
Index("idx_project_images_location", location, postgresql_using="gist"),
Index("idx_project_images_batch_status", "batch_id", "status"),
{},
)


class TaskEvent(Base):
__tablename__ = "task_events"

Expand Down
Loading