Last Updated: May 5, 2026 Project Status: Phase 4 Complete | Phase 5 Complete | Phase 6 Complete | Phase 7 Complete | Phase 8 Complete | Phase 9 Complete (Ant Design Migration) | Code Health Review Complete | Performance Optimization Complete AI Assistant Guide: Use this document to understand the project architecture, conventions, and development practices.
Problem: Upsert operations were processing rows one-by-one, causing 20,000+ database queries for 10,000 rows (2 queries per row: SELECT + INSERT/UPDATE). Tasks with 10,000+ rows took 5-10 minutes to complete.
Solution: Implemented batch processing with bulk operations:
- Process rows in batches of 500
- Single SELECT query fetches all existing records per batch
- Bulk UPDATE using SQL CASE statements for multiple rows
- Bulk INSERT for new records
- Result: 200-300x performance improvement (10 minutes → 10-20 seconds)
Technical Details:
- New function:
_process_upsert_batch()- batch-level upsert logic - New function:
_bulk_update_rows()- generates CASE-based UPDATE statements - Batch size: 500 rows (configurable via BATCH_SIZE constant)
- Total queries reduced from 20,000 to ~60 for 10,000 rows
-
Oracle DATE Insertion Fixed
- Issue: DATE columns inserting NULL with ORA-01861 error
- Root cause: Oracle DATE requires TO_DATE() wrapper, not raw strings
- Fix: Auto-detect YYYY-MM-DD format and wrap with
TO_DATE(:bind, 'YYYY-MM-DD') - Affected: INSERT and UPDATE statements
-
Duplicate TaskRun Creation Fixed
- Issue: Every "Run" button click created 2 TaskRun records
- Root cause: Both API endpoint and worker were creating records
- Fix: Removed TaskRun creation from API endpoint; worker owns lifecycle
-
NULL Update Constraint Violation Fixed
- Issue: ORA-01407 when updating NOT NULL columns to None
- Root cause: UPDATE attempted to set missing/None values
- Fix: Skip columns with None values during UPDATE (preserve existing data)
-
Duration Display Fixed
- Issue: Duration showing "N/A" in UI despite completed runs
- Root cause: Missing
duration_secondsin API response + incorrect field names - Fix: Added duration calculation to all run endpoints; fixed field name mismatches
-
Upsert Keys Parsing Fixed
- Issue: "JSON object must be str" error when task.upsert_keys is already a list
- Fix: Type-check before parsing (handle list, string, or None)
-
Upsert Settings UI (TaskDetail page)
- Toggle upsert mode on/off
- Manage unique key columns (add/remove)
- Configure skip conditions (skip_column + skip_value)
- Visual indicators (badges show enabled/disabled status)
-
Column Mapping Display Fixed
- Column Mappings tab now displays existing mappings
- Previously showed creation wizard incorrectly
- Fix: Pass
existingMappingsprop to ColumnMappingEditor
-
Configurable Log Level
- Added
LOG_LEVELenvironment variable support - Default: INFO
- Usage:
set LOG_LEVEL=DEBUG(Windows) orexport LOG_LEVEL=DEBUG(Unix)
- Added
-
Enhanced Debug Logging
- Date parsing debug output (shows input/output for to_date transform)
- SQL generation logging (shows generated INSERT/UPDATE with TO_DATE wrappers)
- Mapping pipeline logging (source values, transforms, mapped results)
- Batch progress logging (inserted/updated/skipped/errors per batch)
IntakeGateway is a full-stack web application that enables users to:
- Create data import tasks that fetch from external APIs
- Configure API endpoints with headers, authentication, and request bodies
- Map API response fields to database columns
- Trigger task executions and monitor runs
- View detailed logs, statistics, and error reports
Backend:
- Python 3.11 with FastAPI
- SQLAlchemy ORM with SQLite app state and a separate destination DB layer
- Celery for async task execution
- APScheduler for cron-based task scheduling
- Pydantic v2 for data validation (
task.py,task_run.py, andcolumn_mapping.pyschemas usemodel_config = ConfigDict(from_attributes=True);connection.pyandschedule.pystill use Pydantic v1-styleclass Config) - pytest for testing (18 test files: 13 unit + 5 integration; 409+ tests passing)
- App state now lives in a local database, defaulting to SQLite via
APP_DATABASE_URL. - Destination database access is isolated from the app database. Oracle is the current ingestion target, but broken destination connectivity should not break core app routes.
connections.encis auto-created on first save and missing or unreadable files are treated as an empty connection store.
Frontend:
- React 18.2 with TypeScript 5.3
- Vite 5.0 build tool
- React Router v6
- React Query 5.28 (TanStack Query) for server state management
- Ant Design 5 UI component library (migrated from Radix UI + Tailwind in Phase 9)
- @ant-design/icons for iconography
- dayjs for date handling
- Vitest + React Testing Library for testing (14 test files)
┌─────────────────────────────────────────────────────────────────┐
│ User Browser │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ React Frontend (Port 5173) │ │
│ │ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Dashboard │ │ TaskList │ │ RunDetail │ │ │
│ │ └──────┬──────┘ └──────┬───────┘ └──────┬───────┘ │ │
│ │ │ │ │ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ React Query (Hooks + Cache) │ │ │
│ │ └──────────────────┬─────────────────────────┘ │ │
│ │ │ │ │
│ └─────────────────────┼────────────────────────────────────┘ │
│ │ HTTP/HTTPS │
│ ▼ │
├─────────────────────────────────────────────────────────────────┤
│ Network/Internet │
└─────────────────────────────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ FastAPI Backend (Port 8000) │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ API Routes (v1) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌────────────┐ │ │
│ │ │ /tasks │ │ /runs │ │ /stats │ │ │
│ │ └────┬─────┘ └────┬─────┘ └─────┬──────┘ │ │
│ └───────┼─────────────┼──────────────┼────────────────────┘ │
│ │ │ │ │
│ ┌───────▼─────────────▼──────────────▼────────────────────┐ │
│ │ Service Layer │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ • TaskService (CRUD, validation) │ │ │
│ │ │ • RunService (execution, monitoring) │ │ │
│ │ │ • ApiConnector (external API calls) │ │ │
│ │ │ • Mapper (field mapping logic) │ │ │
│ │ │ • Normalizer (data transformation) │ │ │
│ │ │ • Validator (data validation) │ │ │
│ │ └─────────────────┬──────────────────────────────┘ │ │
│ └────────────────────┼──────────────────────────────────┘ │
│ │ │
│ ┌────────────────────▼──────────────────────────────────┐ │
│ │ Celery Task Queue │ │
│ │ • Background task execution │ │
│ │ • Async run processing │ │
│ │ • Task scheduling │ │
│ └────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌────────────────────▼──────────────────────────────────┐ │
│ │ Database Layer (SQLAlchemy) │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Task Model │ │ Run Model │ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └────────────────────┬──────────────────────────────────┘ │
│ │ │
└───────────────────────┼────────────────────────────────────────┘
▼
┌───────────────────────┐ ┌──────────────────────────┐
│ App Database │ │ Destination Database │
│ (SQLite default) │ │ (Oracle currently) │
└───────────────────────┘ └──────────────────────────┘
-
User Creates Task:
- Frontend → POST /api/v1/tasks → Backend creates Task record
- Task stored in the local app DB with configuration
-
User Triggers Run:
- Frontend → POST /api/v1/runs → Backend creates Run record in the local app DB
- Celery worker picks up async task
- Worker calls ApiConnector to fetch data
- Data normalized and validated
- Fields mapped according to configuration
- Run state and logs are saved locally; inserts and updates go to the selected destination DB
-
User Views Run Details:
- Frontend → GET /api/v1/runs/{run_id} → Backend retrieves Run with logs and errors
backend/
├── app/
│ ├── main.py # FastAPI application entry point
│ │
│ ├── api/
│ │ └── v1/
│ │ ├── __init__.py
│ │ └── routes/
│ │ ├── runs.py # Run endpoints (GET, POST, detail)
│ │ ├── tasks.py # Task endpoints (CRUD)
│ │ ├── schedules.py # Schedule endpoints (CRUD)
│ │ ├── column_mappings.py # Column mapping endpoints
│ │ └── __init__.py
│ │
│ ├── core/
│ │ ├── config.py # App configuration
│ │ ├── logging.py # Logging setup
│ │ └── __init__.py
│ │
│ ├── db/
│ │ ├── oracle_pool.py # Oracle connection pooling
│ │ ├── session.py # Database session management
│ │ ├── models/
│ │ │ ├── task.py # Task ORM model
│ │ │ ├── task_run.py # TaskRun ORM model
│ │ │ └── __init__.py
│ │ ├── schemas/
│ │ │ ├── task.py # Pydantic schemas for Task
│ │ │ └── __init__.py
│ │ └── sql/
│ │ └── schema.sql # Database schema definition
│ │
│ ├── services/
│ │ ├── api_connector.py # External API communication
│ │ ├── mapper.py # Field mapping logic
│ │ ├── normalizer.py # Data transformation
│ │ ├── runner.py # Task execution logic
│ │ ├── scheduler.py # Task scheduling
│ │ ├── validator.py # Data validation
│ │ └── __init__.py
│ │
│ └── workers/
│ ├── celery_app.py # Celery configuration
│ ├── tasks.py # Celery task definitions
│ └── __init__.py
│
├── tests/
│ ├── unit/
│ │ ├── test_placeholder.py
│ │ ├── test_models.py
│ │ ├── test_mapper.py
│ │ ├── test_normalizer.py
│ │ ├── test_validator.py
│ │ ├── test_runner.py
│ │ ├── test_column_mappings.py
│ │ └── test_authentication.py
│ └── integration/
│ ├── test_api_endpoints.py
│ ├── test_schedule_routes.py
│ ├── test_mapping_pipeline.py
│ └── test_full_pipeline.py
│
├── pyproject.toml # Poetry dependencies
├── requirements.txt # pip requirements
├── Dockerfile # Docker image
└── README.md
frontend/
├── src/
│ ├── pages/ # Page components (8 pages)
│ │ ├── Dashboard.tsx # KPI cards, recent runs table, quick actions
│ │ ├── TaskList.tsx # Card-based task list with actions
│ │ ├── TaskDetail.tsx # Tabbed view (Details, Schedule, Mappings)
│ │ ├── TaskWizard.tsx # 6-step task creation wizard (Steps component)
│ │ ├── RunsList.tsx # Runs table with status tags
│ │ ├── RunDetail.tsx # Statistics, logs, error breakdown
│ │ ├── Schedules.tsx # Schedule table with filter controls
│ │ └── Settings.tsx # Database connection management
│ │
│ ├── components/ # Editor components
│ │ ├── ColumnMappingEditor.tsx # Field mapping with tree view
│ │ ├── ConnectionEditor.tsx # DB connection form
│ │ ├── ScheduleEditor.tsx # Cron schedule form
│ │ └── UpsertConfigEditor.tsx # Upsert/skip configuration
│ │
│ ├── hooks/
│ │ └── api.ts # React Query hooks (all entities)
│ │
│ ├── api/
│ │ └── client.ts # Axios HTTP client (all endpoints)
│ │
│ ├── types/
│ │ └── index.ts # TypeScript interfaces (all types)
│ │
│ ├── lib/
│ │ └── utils.ts # Date parsing/formatting utilities
│ │
│ ├── __tests__/ # Test suite (14 test files)
│ │ ├── setup.ts # jest-dom setup
│ │ ├── components/
│ │ │ ├── ColumnMappingEditor.test.tsx
│ │ │ ├── ConnectionEditor.test.tsx
│ │ │ ├── ScheduleEditor.test.tsx
│ │ │ └── ScheduleTab.test.tsx
│ │ └── pages/
│ │ ├── Dashboard.test.tsx
│ │ ├── TaskList.test.tsx
│ │ ├── TaskDetail.test.tsx
│ │ ├── RunsList.test.tsx
│ │ ├── RunDetail.test.tsx
│ │ ├── Schedules.test.tsx
│ │ ├── Settings.test.tsx
│ │ ├── TaskWizard.test.tsx
│ │ ├── TaskWizard-Mapping.test.tsx
│ │ └── TaskWizardAuth.test.tsx
│ │
│ ├── theme.ts # Ant Design theme configuration (ConfigProvider)
│ ├── App.tsx # Routing + AntD Layout (Sider, Menu, Content)
│ ├── main.tsx # Entry point
│ └── index.css # Minimal global styles
│
├── PROMPT.md # Ant Design UI specification
├── vite.config.ts # Vite configuration
├── tsconfig.json # TypeScript configuration
├── vitest.config.ts # Vitest configuration
├── package.json
└── README.md
GET /api/v1/tasks # List all tasks (paginated)
GET /api/v1/tasks/{task_id} # Get task details
POST /api/v1/tasks # Create new task
PATCH /api/v1/tasks/{task_id} # Update task
DELETE /api/v1/tasks/{task_id} # Delete task
GET /api/v1/runs # List all runs (paginated)
GET /api/v1/runs/{run_id} # Get run details
POST /api/v1/runs # Trigger new run
Run labels: Run responses include task_name, is_retry, and retry_of_run_id for UI labeling and retry badges.
GET /api/v1/schedules # List all schedules
POST /api/v1/tasks/{task_id}/schedule # Create schedule for task
GET /api/v1/tasks/{task_id}/schedule # Get task schedule
PUT /api/v1/schedules/{schedule_id} # Update schedule
DELETE /api/v1/schedules/{schedule_id} # Delete schedule
POST /api/v1/schedules/{id}/resume # Resume paused schedule
# Mapping CRUD (router prefix: /api/v1/tasks in main.py)
GET /api/v1/tasks/{task_id}/mappings # List mappings
POST /api/v1/tasks/{task_id}/mappings # Bulk create mappings
PUT /api/v1/mappings/{mapping_id} # Update mapping
DELETE /api/v1/mappings/{mapping_id} # Delete mapping
POST /api/v1/tasks/{task_id}/preview-fields # Fetch sample API response
# Oracle / utility endpoints (oracle_router, prefix /api/v1 in main.py)
GET /api/v1/oracle/tables/{table}/columns # Query Oracle metadata
POST /api/v1/preview-fields-standalone # Preview fields without saved task
POST /api/v1/suggest-transforms # Transform suggestions by type
The
oracle_routeris defined as a secondAPIRouter()incolumn_mappings.pyand included inmain.pyviaapp.include_router(oracle_router, prefix="/api/v1")to keep Oracle-specific endpoints isolated from the mapping CRUD router.
GET /api/v1/stats/tasks # Task statistics
GET /api/v1/stats/runs # Run statistics
CREATE TABLE tasks (
id VARCHAR2(36) PRIMARY KEY,
name VARCHAR2(255) NOT NULL,
description CLOB,
endpoint_url VARCHAR2(2000) NOT NULL,
method VARCHAR2(10) NOT NULL,
headers CLOB, -- JSON
body CLOB, -- JSON
table_name VARCHAR2(255) NOT NULL,
field_mapping CLOB, -- JSON
is_active NUMBER(1) DEFAULT 1,
created_at TIMESTAMP,
updated_at TIMESTAMP
);CREATE TABLE task_runs (
id VARCHAR2(36) PRIMARY KEY,
task_id VARCHAR2(36) NOT NULL,
status VARCHAR2(50) NOT NULL, -- running, completed, failed, partial_failure
total_records NUMBER,
successful_records NUMBER,
failed_records NUMBER,
error_details CLOB, -- JSON
logs CLOB, -- JSON
started_at TIMESTAMP,
completed_at TIMESTAMP,
FOREIGN KEY (task_id) REFERENCES tasks(id)
);- All business logic in
/services/directory - Each service handles one domain (Mapper, Validator, etc.)
- Services are testable and reusable
- All input validation through Pydantic v2 schemas
- All schemas use
model_config = ConfigDict(from_attributes=True)(not the deprecated V1class Config) - Type hints on all functions
- Request/response models in
/db/schemas/
- FastAPI dependencies for database session
- Clean separation of concerns
- Easy to test with mocks
- Long-running operations via Celery
- Non-blocking API responses
- Background workers handle heavy lifting
- All data fetching through hooks
- Centralized cache management
- Automatic refetching strategies
- Page components handle routing
- UI components are presentational
- Custom hooks for logic
- 100% TypeScript coverage
- Strict mode enabled
- All API responses typed
- Centralized error handling in ApiClient
- Error boundaries on pages
- User-friendly error messages
Location: backend/tests/unit/
Coverage Areas:
- Model validation (test_models.py)
- Data mapping (test_mapper.py)
- Data normalization (test_normalizer.py)
- Data validation (test_validator.py)
Running Tests:
cd backend
pytest tests/unit/ -v --tb=shortTest Count: 409+ tests passing
Location: frontend/src/__tests__/
Coverage Areas:
- Component rendering
- Hook behavior
- User interactions
- Error handling
- Navigation
- Schedule management
- Column mapping
Test Files:
- Dashboard.test.tsx
- TaskList.test.tsx
- TaskDetail.test.tsx
- RunsList.test.tsx
- RunDetail.test.tsx
- TaskWizard.test.tsx
- TaskWizard-Mapping.test.tsx
- TaskWizardAuth.test.tsx
- Schedules.test.tsx
- ColumnMappingEditor.test.tsx
- ScheduleEditor.test.tsx
- ScheduleTab.test.tsx
Running Tests:
cd frontend
npm run testTest Files: 12 test files
# Type hints required
def process_data(data: dict) -> dict:
"""Process data according to mapping."""
pass
# Docstrings for functions
class TaskService:
"""Service for task operations."""
async def create_task(self, task_data: TaskCreate) -> Task:
"""Create a new task."""
pass
# Use constants at module level
DEFAULT_TIMEOUT = 30
MAX_RETRIES = 3
# Logging instead of print
logger.info(f"Task {task_id} started")// All components typed
interface TaskListProps {
tasks: Task[];
onDelete: (id: string) => void;
}
// Hooks follow naming convention
const useTaskData = (id: string) => {
return useQuery({...});
};
// Comments for complex logic
// Calculate success rate: successful / total
const successRate = (successful / total) * 100;
// Use constants for magic strings
const TASK_STATUS = {
ACTIVE: 'active',
INACTIVE: 'inactive',
} as const;Backend:
cd backend
python -m uvicorn app.main:app --reload --port 8000Frontend:
cd frontend
npm install
npm run devCommon Issues & Fixes (Updated January 2026):
-
PostCSS ES Module Error
# Error: "module is not defined in ES module scope" # Fix: Rename postcss.config.js to .cjs extension cd frontend mv postcss.config.js postcss.config.cjs
- Why: package.json has
"type": "module", requiring .cjs extension for CommonJS files
- Why: package.json has
-
Radix UI Installation Failure
# Error: "No matching version found for @radix-ui/react-slot@^2.0.2" # Fix: Update package.json to use version 1.1.0
- In
frontend/package.json, ensure:"@radix-ui/react-slot": "^1.1.0" - Version 2.x is not yet available in npm registry
- In
-
Missing date-fns Dependency
# Error: "date-fns imported but could not be resolved" cd frontend npm install date-fns
-
Backend Missing uvicorn
cd backend pip install -r requirements.txt
Access Points:
- Frontend: http://localhost:5173
- Backend API: http://localhost:8000
- API Docs: http://localhost:8000/docs
-
Create feature branch
git checkout -b feature/description
-
Make changes to backend or frontend
- Follow coding conventions above
- Add tests for new functionality
- Update types/interfaces
-
Run tests
# Backend cd backend && pytest tests/ -v # Frontend cd frontend && npm run test
-
Commit changes
git add . git commit -m "feat: description of changes"
- Never commit to main directly
- All tests must pass before merging
- Update types whenever changing data structures
- Keep commits atomic (one feature per commit)
- Write clear commit messages
- External API auth: Bearer token, API Key, HTTP Basic, and OAuth (Phase 7, complete)
- Credentials stored encrypted at rest using Fernet (
cryptography==46.0.7) - Encryption key read from
ENCRYPTION_KEYenv var; only the literal value"dev-only"triggers temp-key generation (not any string containing"dev") - OAuth: if
access_tokenis absent fromoauth_config,apply_authentication()raisesValueErrorimmediately (fail-fast; no silent pass-through) - CORS configured for localhost development
- API input validation through Pydantic v2 models
- TaskOut / TaskRunOut schemas never expose
api_keyorpasswordfields
- JWT token-based UI authentication (not yet implemented)
- Role-based access control (RBAC)
- Per-connection random PBKDF2 salts (current implementation uses a fixed salt)
- Connection pooling for Oracle Database
- Async operations with Celery
- Query optimization with proper indexing
- Caching strategies (future enhancement)
- Code splitting by route
- Lazy loading components
- React Query caching
- Bundle size: < 100KB (gzipped)
- Task and run tables indexed on id
- Status queries use indexed columns
- Pagination prevents loading all records
Issue: Oracle connection fails
# Solution: Check connection string in config.py
ORACLE_URL = "oracle+cx_oracle://user:password@host:1521/service"Issue: Celery tasks not executing
# Solution: Start Celery worker
celery -A app.workers.celery_app worker --loglevel=infoIssue: API calls fail with CORS error
# Solution: Ensure backend CORS is configured for localhost:5173
Issue: Tests fail with "module not found"
# Solution: Clear node_modules and reinstall
rm -rf node_modules
npm installapp/main.py- FastAPI app setupapp/api/v1/routes/tasks.py- Task routesapp/api/v1/routes/runs.py- Run routesapp/services/*.py- Business logicapp/db/models/- Database modelsapp/db/schemas/- Pydantic schemas
src/App.tsx- Routing setupsrc/api/client.ts- API clientsrc/hooks/api.ts- React Query hookssrc/pages/*.tsx- Page componentssrc/components/ui/*.tsx- UI components
-
Understand Context First
- Read relevant documentation
- Check existing patterns
- Review related files
-
Type Safety
- Always use TypeScript/Python types
- Update interfaces when changing data
- Verify no type errors after changes
-
Testing
- Add tests for new functionality
- Update existing tests if needed
- Run full test suite before completing
-
Code Quality
- Follow existing code style
- Add comments for complex logic
- Keep functions focused and testable
-
Documentation
- Update README if public API changes
- Add comments to complex sections
- Keep this claude.md updated with major changes
# Always work on feature branch
git checkout -b feature/name
# Make changes, test, commit
git add .
git commit -m "feat: description"
# Never force push to main
git push origin feature/name- Check error messages carefully
- Review relevant code sections
- Add logging/console output
- Test with minimal reproducible case
- Refer to documentation
- Understand requirements completely
- Design changes (data structure, API, UI)
- Implement with tests
- Update documentation
- Verify all tests pass
Phase 6 adds comprehensive column mapping functionality to enable users to map API response fields (including nested JSON) to database columns during task creation. Features advanced UI with hierarchical field display, automatic type detection, transform suggestions, and mapping templates.
API Routes (app/api/v1/routes/column_mappings.py):
Two routers are defined in this file:
router— mapping CRUD endpoints (prefix/api/v1inmain.py)GET /api/v1/tasks/{task_id}/mappings- List mappingsPOST /api/v1/tasks/{task_id}/mappings- Bulk create mappingsPUT /api/v1/mappings/{mapping_id}- Update mappingDELETE /api/v1/mappings/{mapping_id}- Delete mappingPOST /api/v1/tasks/{task_id}/preview-fields- Fetch sample API response (manual/auto)
oracle_router— Oracle/utility endpoints (prefix/api/v1inmain.py)GET /api/v1/oracle/tables/{table_name}/columns- Query Oracle metadataPOST /api/v1/preview-fields-standalone- Preview fields without a saved taskPOST /api/v1/suggest-transforms- Suggest transforms by source/dest type
New Services:
oracle_metadata.py- Query OracleUSER_TAB_COLUMNSfor table schematransform_suggester.py- Recommend transforms based on type mismatches- Enhanced
api_connector.py- Fetch sample responses with lenient JSON parsing
Enhanced Models:
column_mapping.py- ColumnMapping ORM model (already exists, exposed via API)mapper.py- Added transforms:to_timestamp,to_date,format_date
Pydantic Schemas (app/db/schemas/column_mapping.py - NEW):
ColumnMappingCreate- Input for creating mappingsColumnMappingOut- API response modelColumnMappingUpdate- Update payloadBulkMappingCreate- Batch create multiple mappings
New Types (src/types/index.ts):
ColumnMappinginterface (with metadata)MappingPreviewinterface (hierarchical tree structure)OracleColumninterface (column name + type)FieldNodeinterface (tree node with parent/children)
New Component (src/components/ColumnMappingEditor.tsx):
- Three-column layout: API Fields | Mapping Config | DB Columns
- Hierarchical tree view of nested fields (expandable)
- Copy-to-clipboard dot notation for each field (e.g.,
user.address.city) - Add/remove mapping rows with dropdowns
- Transform multi-select dropdowns (8 available)
- Auto-suggest transform badges with click-to-apply
- Fetch sample button (auto-fetch OR manual paste)
- Save all mappings button
- Template save/load (localStorage)
- Unmapped fields warning badge
Enhanced TaskWizard (src/pages/TaskWizard.tsx):
- New Step 4.5: "Mapping Configuration"
- Embeds
<ColumnMappingEditor />component - At least 1 mapping required (blocking validation)
- Unmapped fields warning (non-blocking)
- "Skip for Now" option (creates task without mappings)
- Mappings saved to component state on "Next" click
Enhanced TaskDetail (src/pages/TaskDetail.tsx):
- New "Configure Mappings" tab/accordion
- Reuses
<ColumnMappingEditor />component - Advanced Options section:
- "Save as Template" button (save to localStorage)
- "Load Template" dropdown
- "Auto-Match by Name" toggle (case-insensitive name matching)
- "Apply Transform to All Strings" (batch operation)
- "Clear All Mappings" button
New Hooks (src/hooks/api.ts):
useColumnMappings(taskId)- Fetch mappings listuseCreateMappings()- Bulk create mappingsuseUpdateMapping()- Update single mappinguseDeleteMapping()- Delete mappingusePreviewFields(taskId, sampleJson?)- Fetch flattened fieldsuseOracleColumns(tableName)- Query DB columnsuseSuggestTransforms(sourceType, destType)- Transform suggestionsuseSaveMappingTemplate()- Save to localStorage
- Automatic Flattening: Converts nested objects to dot notation (e.g.,
{"user": {"name": "Alice"}}→{"user.name": "Alice"}) - Hierarchical Display: Tree view shows nested structure intuitively
- Copy-to-Clipboard: Each leaf node has button to copy full path (e.g.,
user.address.city) - Multiple Nesting Levels: Supports arbitrarily deep nesting
- Auto-Detection: Infers field types from sample API response (string, number, boolean, null, array, object)
- Oracle Metadata: Queries database for column types from
USER_TAB_COLUMNS - Transform Suggestions: Shows recommendations when types mismatch (e.g., string → number suggests
to_int) - Visual Warnings: Yellow badge alerts when manual transform needed
Available transforms:
- trim (remove whitespace)
- upper (uppercase)
- lower (lowercase)
- to_int (parse integer)
- to_float (parse float)
- to_bool (parse boolean)
- to_timestamp (ISO 8601 → Oracle TIMESTAMP)
- to_date (YYYY-MM-DD → Oracle DATE)
- During Wizard: Saved to component state on "Next" button (not real-time)
- After Creation: Mappings sent with bulk create request during task creation
- In TaskDetail: Updates trigger individual API calls with optimistic updates
- Save Template: Save current mappings to browser storage with custom name
- Load Template: Dropdown to apply saved templates
- Delete Template: Remove stored templates
- Scope: Per-browser, not shared (Phase 2 enhancement planned)
- Apply Transform: Select transform, apply to all string fields
- Auto-Match: Create mappings for fields matching DB column names (case-insensitive)
- Clear All: Reset all mappings (confirmation dialog)
Manual Paste Mode:
- User clicks "Fetch Sample"
- Modal opens with "Manual Paste" tab
- User pastes JSON response
- System attempts to parse JSON
- If invalid: Shows error message with issue description (line/column)
- User corrects and retries
- On success: Displays flattened fields in tree view
Auto-Fetch Mode:
- User clicks "Fetch Sample" → "Auto-Fetch" tab
- System makes test API call using task's configured endpoint/headers
- Extracts records using JSONPath
- Returns sample with flattened field structure
- Shows field types and sample values
Pydantic Issues: If Pydantic import/compatibility errors arise, fallback to alternative validation libraries:
| Library | Migration Effort | When to Use |
|---|---|---|
| Dataclasses | ~30-45 min | Default fallback (stdlib, no dependencies) |
| Attrs | ~1-2 hours | If need rich validation + type hints |
| Marshmallow | ~2-3 hours | If need SQLAlchemy deep integration |
Oracle Metadata Querying:
- Graceful degradation: If
USER_TAB_COLUMNSquery fails (permissions), show manual entry fallback - Logs permission errors for debugging
Nested JSON Parsing:
- Handles null values, empty objects, primitive types consistently
- Doesn't flatten arrays (kept as-is for Phase 1)
- Arrays within objects preserved (Phase 2 enhancement: array explosion)
Unit Tests (backend/tests/unit/test_column_mappings.py):
- API endpoint validation
- Transform suggestions logic
- Oracle metadata querying
- JSON parsing with lenient error handling
- 15+ test cases
Integration Tests (backend/tests/integration/test_mapping_pipeline.py):
- End-to-end nested JSON flattening
- Multi-level nesting (3-4 levels)
- Mapping creation and application
- Transform chaining
- 8+ test cases
Frontend Tests (frontend/src/__tests__/components/ColumnMappingEditor.test.tsx):
- Field preview rendering
- Tree view expansion/collapse
- Mapping CRUD operations
- Transform selection
- Template save/load
- Auto-suggest transform triggers
- 10+ test cases
Frontend Integration Tests (frontend/src/__tests__/pages/TaskWizard-Mapping.test.tsx):
- Wizard step 4.5 navigation
- Validation blocking without mappings
- Warning with unmapped fields
- State persistence
- Skip for now functionality
- 8+ test cases
Total Target: 25+ new tests
Step 1: Backend column mapping API + schemas (2-3 hours) Step 2: Oracle metadata service (1-2 hours) Step 3: Transform suggester service (1 hour) Step 4: API connector sample fetching (1-2 hours) Step 5: Frontend types & hooks (1-2 hours) Step 6: ColumnMappingEditor component (4-5 hours) Step 7: TaskWizard step 4.5 (2-3 hours) Step 8: TaskDetail mapping tab (2-3 hours) Step 9: Testing (3-4 hours) Total: ~18-25 hours development time
Phase 1 (Current):
- ✅ Nested JSON flattening
- ✅ Type-aware mapping with auto-suggestions
- ✅ Mapping templates (localStorage only)
- ✅ Batch operations
- ✅ Hierarchical field display
Phase 2 (Future):
- ⏳ Array element mapping (index access:
tags.0,tags.1) - ⏳ Array explosion (one row per element)
- ⏳ Shared mapping templates (database storage)
- ⏳ Mapping import/export (JSON file)
- ⏳ Advanced validation (field existence checks)
- ⏳ Drag-and-drop field mapping UI
Not Planned:
- Complex JSONPATH transformations (out of scope)
- Real-time field preview updates (performance concern)
Phase 7 adds two critical production features:
- Flexible API Authentication: Support for Bearer tokens, API keys, Basic Auth, and OAuth for external API data fetching
- Task Scheduler UI: Complete frontend interface for creating and managing cron-based task schedules with enhanced retry logic
Database Schema Changes:
ALTER TABLE task ADD auth_type VARCHAR2(20) NULL; -- 'none', 'bearer', 'api_key', 'basic', 'oauth'
ALTER TABLE task ADD api_key VARCHAR2(500) NULL; -- Encrypted
ALTER TABLE task ADD username VARCHAR2(200) NULL; -- For Basic auth
ALTER TABLE task ADD password VARCHAR2(200) NULL; -- Encrypted
ALTER TABLE task ADD oauth_config CLOB NULL; -- JSON for OAuthNew Backend Components:
backend/app/core/encryption.py- Fernet encryption service for credentialsbackend/app/services/api_connector.py- Enhanced withapply_authentication()function- Authentication types supported:
- None: No authentication (default)
- Bearer: Adds
Authorization: Bearer {token}header - API Key: Adds custom header (e.g.,
X-API-Key: {key}) - Basic Auth: Adds
Authorization: Basic {base64(username:password)} - OAuth: Token refresh flow (Phase 7.5)
Frontend Components:
- TaskWizard: New "Authentication" step with auth type dropdown
- Conditional input fields based on auth type selection
- Password masking for sensitive credentials
New Backend API Routes (backend/app/api/v1/routes/schedules.py):
POST /api/v1/tasks/{task_id}/schedule- Create scheduleGET /api/v1/tasks/{task_id}/schedule- Get task schedulePUT /api/v1/schedules/{schedule_id}- Update scheduleDELETE /api/v1/schedules/{schedule_id}- Delete scheduleGET /api/v1/schedules/- List all schedulesPOST /api/v1/schedules/{schedule_id}/resume- Resume paused schedule
Frontend Components:
ScheduleEditor.tsx- Cron expression editor with presets and validationSchedules.tsx- List page showing all schedules across tasks- TaskDetail integration - Schedule tab for per-task configuration
- TaskList indicators - Clock icon badges for scheduled tasks
Cron Presets:
- Hourly:
0 * * * * - Daily at 2 AM:
0 2 * * * - Weekly (Sunday 2 AM):
0 2 * * 0 - Monthly (1st at 2 AM):
0 2 1 * *
Discriminated Retry (Celery task):
@celery_app.task(
autoretry_for=(
httpx.NetworkError,
httpx.TimeoutException,
httpx.ConnectError,
), # Only retry network issues, not validation errors
)Retry on Status Code:
- ✅ Retry: 5xx server errors (503, 500, 502)
- ❌ No Retry: 4xx client errors (400, 401, 404) - these indicate config issues
- ❌ No Retry: Validation errors - data quality issues, not transient
Schedule-Level Retry Configuration (TaskSchedule model):
max_retries = Column(Integer, default=3)
consecutive_failures = Column(Integer, default=0)
status = Column(String(30), default='active') # 'active', 'paused_by_failures', 'disabled'Auto-Pause Logic:
- Track consecutive failures per schedule
- After N failures (configurable), set status to
paused_by_failures - Requires manual resume via
/schedules/{id}/resumeendpoint - Prevents endless retries of broken configurations
- Encryption at Rest: All API keys and passwords encrypted using Fernet symmetric encryption
- Key Management: Encryption key stored in environment variable
ENCRYPTION_KEY - API Response Filtering: TaskOut schema excludes sensitive fields (
api_key,password) - Key Rotation: Encryption service supports key rotation for compliance
User creates task with auth → Credentials encrypted → Stored in Task table
↓
Task execution triggered
↓
api_connector.py retrieves Task
↓
apply_authentication() decrypts & formats
↓
httpx.request() includes auth headers
↓
External API authenticates request
User creates schedule in UI → POST /tasks/{id}/schedule → Validate cron expression
↓
Store in task_schedule table
↓
Call get_scheduler().add_schedule()
↓
APScheduler adds cron job
↓
Job triggers → Enqueue Celery task → Track success/failure
↓
Update consecutive_failures counter
↓
If threshold exceeded → Pause schedule + alert
Track A: Authentication (100% complete):
- Database migration (auth fields)
- Encryption service
- API connector auth logic
- Pydantic schema updates
- Frontend auth UI
- Unit tests
Track B: Scheduler UI (100% complete):
- Schedule API routes (5 endpoints)
- Pydantic schemas with cron validation
- Frontend types & API client
- React Query hooks
- ScheduleEditor component
- TaskDetail integration
- Schedules list page
- TaskList indicators
- Tests
Track C: Enhanced Retry (100% complete):
- Discriminate retryable errors
- Schedule retry configuration
- Failure tracking & auto-pause
- Resume paused schedules
Backend Tests (25+ cases planned):
test_authentication.py: Bearer token formatting, API key injection, Basic auth encoding, encryption round-triptest_schedule_routes.py: CRUD operations, cron validation, scheduler reloadtest_enhanced_retry.py: Network error retry, 5xx retry, 4xx no-retry, consecutive failure tracking
Frontend Tests (15+ cases planned):
ScheduleEditor.test.tsx: Cron validation, preset selection, next run calculationSchedules.test.tsx: List rendering, filter, edit/delete actionsAuthenticationStep.test.tsx: Auth type dropdown, conditional fields, password masking
- Encryption Key Storage: Use environment variable, not code (12factor.net principle)
- HTTPS Requirement: Document that production must use HTTPS to prevent MITM attacks
- OAuth Refresh Tokens: Store encrypted, implement rotation (Phase 7.5)
- Audit Trail: Log schedule creates/updates/deletes (add user context when auth implemented)
- Rate Limiting: Consider adding to prevent credential stuffing attacks (Phase 8)
Phase 7 (Current scope):
- ✅ Bearer, API Key, Basic Auth support
- ✅ OAuth framework (token refresh in Phase 7.5)
- ✅ Cron-based scheduling with validation
- ✅ Auto-pause on consecutive failures
- ✅ Manual resume paused schedules
Phase 7.5 (Future):
- ⏳ OAuth provider integration (Google, GitHub, Azure AD)
- ⏳ Schedule conflict detection (prevent overlapping runs)
- ⏳ Visual cron builder (drag-and-drop time picker)
- ⏳ Email/Slack notifications for schedule failures
- ⏳ "Test Connection" button in TaskWizard
Phase 8 (Not planned):
- Certificate-based authentication (mTLS)
- Multi-factor authentication for UI
- Schedule templates (export/import)
- Advanced retry strategies (jitter, circuit breaker)
Phase 8 focuses on production-readiness and user experience improvements:
- Database Connection Configuration UI - Move DB settings from .env to secure admin page
- WebSocket Real-time Updates - Live run status and log streaming
- Visual Cron Builder - User-friendly schedule creation
- Mobile-Responsive UI - Touch-friendly interface for all screen sizes
- Upsert Logic - Insert or update records based on unique keys
Threat Model:
- Credentials must never be exposed in API responses
- Credentials must be encrypted at rest
- Only authorized admins can modify connections
- Connection strings must be validated before saving
- Audit trail for all configuration changes
Storage Strategy (Encrypted File):
┌─────────────────────────────────────────────────────────────────┐
│ Encrypted File Storage (Recommended) │
├─────────────────────────────────────────────────────────────────┤
│ Default Location: connections.enc │
│ Override: CONNECTIONS_FILE_PATH │
│ Recommended Production Value: /etc/intakegateway/connections.enc │
│ Encryption: Fernet symmetric encryption │
│ Master Key: ENCRYPTION_KEY environment variable │
└─────────────────────────────────────────────────────────────────┘
File Structure (JSON, encrypted at rest):
{
"connections": [
{
"id": "uuid-1",
"name": "Production Oracle",
"host": "db.example.com",
"port": 1521,
"service": "ORCL",
"username": "intakegateway_user",
"password": "encrypted_password_here",
"is_default": true,
"created_at": "2026-02-03T10:00:00Z",
"updated_at": "2026-02-03T10:00:00Z"
}
],
"metadata": {
"version": 1,
"last_modified_by": "admin"
}
}
Benefits:
- Simpler for single-instance deployments
- No database dependency for connection config
- Easy backup/restore (copy encrypted file)
- Works during initial setup (before DB is configured)
- Auto-recreated when the first connection is saved after deletion
- Missing or unreadable files degrade to an empty list instead of breaking the app
Security Measures:
- File permissions: 600 (owner read/write only)
- Directory permissions: 700
- Encryption key never stored in file
- File integrity check via HMAC
Backend Components:
backend/app/
├── api/v1/routes/
│ └── connections.py # Connection CRUD endpoints
├── services/
│ ├── connection_pool.py # Destination connection pool management
│ └── connection_storage.py # Encrypted file read/write operations
├── db/
│ └── schemas/
│ └── connection.py # Pydantic schemas (no password in response)
└── core/
└── encryption.py # Enhanced Fernet encryption (existing)
Connection File Service:
# backend/app/services/connection_storage.py
import os
import json
from pathlib import Path
from app.core.encryption import encrypt_data, decrypt_data
DEFAULT_CONFIG_PATH = os.getenv("CONNECTIONS_FILE_PATH", "connections.enc")
class ConnectionFileService:
def __init__(self, config_path: str = DEFAULT_CONFIG_PATH):
self.config_path = Path(config_path)
def read_connections(self) -> dict:
"""Read and decrypt connections file."""
if not self.config_path.exists():
return {"connections": [], "active_connection_id": None, "version": 1}
encrypted_data = self.config_path.read_bytes()
decrypted_json = decrypt_data(encrypted_data)
return json.loads(decrypted_json)
def write_connections(self, data: dict, modified_by: str = "system"):
"""Encrypt and write connections file."""
data["metadata"]["last_modified_by"] = modified_by
json_data = json.dumps(data, indent=2)
encrypted_data = encrypt_data(json_data.encode())
# Ensure directory exists with secure permissions
self.config_path.parent.mkdir(parents=True, exist_ok=True)
os.chmod(self.config_path.parent, 0o700)
# Write file with secure permissions
self.config_path.write_bytes(encrypted_data)
os.chmod(self.config_path, 0o600)API Endpoints:
GET /api/v1/connections # List connections (passwords masked)
POST /api/v1/connections # Create connection
PUT /api/v1/connections/{id} # Update connection
DELETE /api/v1/connections/{id} # Delete connection
POST /api/v1/connections/{id}/test # Test connection (returns success/error)
POST /api/v1/connections/{id}/activate # Set as active connection
Security Best Practices:
- Never return passwords - API responses show
password: "********" - Encrypt at rest - Use Fernet symmetric encryption (same as Phase 7)
- Validate before save - Test connection before persisting
- Rate limit test endpoint - Prevent brute force attacks
- Audit logging - Log all connection changes with timestamp/user
- Require re-authentication - Prompt for current password before changes
- Environment fallback - If no DB config exists, fall back to .env (migration path)
Frontend Components:
frontend/src/
├── pages/
│ └── Settings.tsx # Settings page with connections tab
├── components/
│ └── ConnectionEditor.tsx # Connection form with test button
Migration Strategy:
- App starts → Check if
db_connectionstable has entries - If empty → Read from .env → Create default connection → Mark as active
- Future runs → Use database configuration
- .env becomes backup/override for emergencies
Technology Choice: FastAPI WebSocket + React useWebSocket hook
WebSocket Events:
// Server → Client events
interface WSEvent {
type: 'run_status' | 'run_progress' | 'run_log' | 'schedule_triggered';
payload: RunStatusPayload | RunProgressPayload | RunLogPayload | SchedulePayload;
}
interface RunStatusPayload {
run_id: string;
status: 'PENDING' | 'RUNNING' | 'SUCCESS' | 'PARTIAL_SUCCESS' | 'FAILED';
completed_at?: string;
}
interface RunProgressPayload {
run_id: string;
total_records: number;
processed_records: number;
failed_records: number;
percentage: number;
}
interface RunLogPayload {
run_id: string;
timestamp: string;
level: 'INFO' | 'WARNING' | 'ERROR';
message: string;
}
interface SchedulePayload {
schedule_id: string;
task_id: string;
task_name: string;
triggered_at: string;
}Backend Components:
backend/app/
├── api/v1/
│ └── websocket.py # WebSocket endpoint handler
├── services/
│ └── ws_manager.py # Connection manager (broadcast, rooms)
└── workers/
└── tasks.py # Modified to emit WS events
WebSocket Endpoint:
# /api/v1/ws
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
# Keep connection alive, handle client messages
data = await websocket.receive_text()
# Handle subscription to specific runs
except WebSocketDisconnect:
manager.disconnect(websocket)Frontend Integration:
// hooks/useWebSocket.ts
const useRunUpdates = (runId: string) => {
const { lastMessage } = useWebSocket(`ws://localhost:8000/api/v1/ws`);
useEffect(() => {
if (lastMessage?.type === 'run_progress' && lastMessage.payload.run_id === runId) {
// Update React Query cache
queryClient.setQueryData(['run', runId], (old) => ({
...old,
...lastMessage.payload
}));
}
}, [lastMessage]);
};Use Cases:
- RunDetail page - Live progress bar, streaming logs
- RunsList page - Status badges update automatically
- Dashboard - Recent runs list updates in real-time
- Toast notifications - "Task X completed successfully"
Component Structure:
┌─────────────────────────────────────────────────────────────────┐
│ Schedule Configuration │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Quick Presets │ │
│ │ [Every Hour] [Daily 2AM] [Weekly Sun] [Monthly 1st] │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Frequency [Dropdown: Hourly/Daily/Weekly/Monthly] │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Time Picker [ 02 ▼ ] : [ 00 ▼ ] (24-hour format) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Days of Week (for Weekly) │ │
│ │ [Mon] [Tue] [Wed] [Thu] [Fri] [Sat] [Sun] │ │
│ │ ○ ○ ○ ○ ○ ○ ● │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Day of Month (for Monthly) │ │
│ │ [ 1 ▼ ] or [Last day of month ☐] │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Generated Cron: 0 2 * * 0 │ │
│ │ Human readable: "Every Sunday at 2:00 AM" │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Next 5 Runs: │ │
│ │ • Sun, Feb 9, 2026 at 2:00 AM │ │
│ │ • Sun, Feb 16, 2026 at 2:00 AM │ │
│ │ • Sun, Feb 23, 2026 at 2:00 AM │ │
│ │ • Sun, Mar 2, 2026 at 2:00 AM │ │
│ │ • Sun, Mar 9, 2026 at 2:00 AM │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Advanced: Edit cron directly │ │
│ │ [ 0 2 * * 0 ] │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Frontend Components:
frontend/src/components/
├── CronBuilder/
│ ├── index.tsx # Main container
│ ├── FrequencySelector.tsx # Hourly/Daily/Weekly/Monthly
│ ├── TimePicker.tsx # Hour:Minute selector
│ ├── DayOfWeekPicker.tsx # Toggle buttons for weekdays
│ ├── DayOfMonthPicker.tsx # Dropdown 1-31 + last day
│ ├── CronPreview.tsx # Shows cron + human readable
│ ├── NextRunsList.tsx # Upcoming execution dates
│ └── utils.ts # Cron generation/parsing logic
Cron Generation Logic:
interface CronConfig {
frequency: 'hourly' | 'daily' | 'weekly' | 'monthly';
hour: number; // 0-23
minute: number; // 0-59
daysOfWeek: number[]; // 0-6 (Sun-Sat)
dayOfMonth: number | 'last';
}
function generateCron(config: CronConfig): string {
const { frequency, hour, minute, daysOfWeek, dayOfMonth } = config;
switch (frequency) {
case 'hourly':
return `${minute} * * * *`;
case 'daily':
return `${minute} ${hour} * * *`;
case 'weekly':
return `${minute} ${hour} * * ${daysOfWeek.join(',')}`;
case 'monthly':
const dom = dayOfMonth === 'last' ? 'L' : dayOfMonth;
return `${minute} ${hour} ${dom} * *`;
}
}Backend Support:
# New endpoint to calculate next runs
GET /api/v1/schedules/preview?cron=0+2+*+*+0&count=5
Response:
{
"cron": "0 2 * * 0",
"human_readable": "Every Sunday at 2:00 AM",
"next_runs": [
"2026-02-09T02:00:00Z",
"2026-02-16T02:00:00Z",
...
]
}/* Tailwind breakpoints */
sm: 640px /* Mobile landscape */
md: 768px /* Tablet */
lg: 1024px /* Desktop */
xl: 1280px /* Large desktop */Sidebar Navigation:
Desktop (lg+): Mobile (< lg):
┌────┬──────────────┐ ┌──────────────────┐
│ ☰ │ │ │ ☰ IntakeGateway [≡] │ <- Hamburger menu
│ │ │ ├──────────────────┤
│ 📊 │ Content │ │ │
│ 📋 │ │ │ Content │
│ ▶️ │ │ │ │
│ 📅 │ │ │ │
└────┴──────────────┘ └──────────────────┘
Mobile menu (slide-in):
┌──────────────────┐
│ ✕ Close │
├──────────────────┤
│ 📊 Dashboard │
│ 📋 Tasks │
│ ▶️ Runs │
│ 📅 Schedules │
│ ⚙️ Settings │
└──────────────────┘
Data Tables:
Desktop: Mobile (card layout):
┌────┬──────┬────────┬──────┐ ┌──────────────────┐
│ ID │ Name │ Status │ Act │ │ Import Users │
├────┼──────┼────────┼──────┤ │ Status: ● Active │
│ 1 │ Task │ Active │ [▶] │ │ Last run: 2h ago │
└────┴──────┴────────┴──────┘ │ [▶ Run] [Edit] │
└──────────────────┘
┌──────────────────┐
│ Sync Products │
│ ... │
└──────────────────┘
Forms:
- Stack form fields vertically on mobile
- Full-width inputs
- Larger touch targets (min 44px height)
- Floating action buttons for primary actions
Implementation Checklist:
- Collapsible sidebar with hamburger menu
- Responsive table → card layout switcher
- Touch-friendly button sizes (min 44x44px)
- Swipe gestures for card actions (optional)
- Bottom navigation bar for mobile (optional)
- Viewport meta tag already set
- Test on iOS Safari, Android Chrome
Key Files to Modify:
frontend/src/
├── components/
│ ├── layout/
│ │ ├── Sidebar.tsx # Add mobile hamburger
│ │ └── MobileNav.tsx # New slide-in menu
│ └── ui/
│ └── ResponsiveTable.tsx # Table/Card switcher
├── pages/
│ ├── TaskList.tsx # Use ResponsiveTable
│ ├── RunsList.tsx # Use ResponsiveTable
│ └── Schedules.tsx # Use ResponsiveTable
└── index.css # Mobile-first utilities
Enable tasks to update existing records (if unique key matches) or insert new ones, with intelligent row skipping for already-processed records and graceful error handling.
Task Model Enhancement:
ALTER TABLE tasks ADD upsert_enabled NUMBER(1) DEFAULT 0;
ALTER TABLE tasks ADD upsert_keys VARCHAR2(500); -- JSON array of column names
ALTER TABLE tasks ADD skip_column VARCHAR2(100); -- Column to check for skip condition
ALTER TABLE tasks ADD skip_value VARCHAR2(100); -- Value that triggers skip (e.g., 'Y')
-- Example: skip_column = "processed", skip_value = "Y"Use Case: A third-party system processes records and marks them with processed = 'Y'. The import should skip these rows to avoid overwriting changes.
Skip Conditions:
- Pre-processed rows: If
skip_columnhasskip_value, skip the row - Primary key errors: Log error, skip row, continue processing
- Constraint violations: Log error, skip row, continue processing
Skip Flow Diagram:
For each record in API response:
│
├─► Check if record exists in DB (by upsert_keys)
│ │
│ ├─► EXISTS + skip_column = skip_value
│ │ └─► SKIP (log: "Row skipped - already processed")
│ │
│ ├─► EXISTS + skip_column ≠ skip_value
│ │ └─► UPDATE record
│ │
│ └─► NOT EXISTS
│ └─► INSERT record
│
├─► On PRIMARY KEY error
│ └─► LOG error + SKIP + CONTINUE
│
└─► On CONSTRAINT error
└─► LOG error + SKIP + CONTINUE
Strategy 1: MERGE Statement with Skip Logic (Oracle)
MERGE INTO target_table t
USING (SELECT :col1 as col1, :col2 as col2 FROM dual) s
ON (t.employee_id = s.employee_id)
WHEN MATCHED THEN
UPDATE SET t.col2 = s.col2, t.updated_at = SYSDATE
WHERE t.processed IS NULL OR t.processed != 'Y' -- Skip if already processed
WHEN NOT MATCHED THEN
INSERT (col1, col2, created_at) VALUES (s.col1, s.col2, SYSDATE);Strategy 2: Check-Skip-then-Insert/Update (With Error Handling)
# For databases without MERGE support or complex skip logic
existing = session.query(Model).filter_by(unique_key=value).first()
if existing:
if existing.processed == 'Y':
return RowResult.SKIPPED # Already processed by third party
for key, val in data.items():
setattr(existing, key, val)
else:
session.add(Model(**data))Enhanced Runner Service:
# backend/app/services/runner.py
from dataclasses import dataclass
from enum import Enum
class RowStatus(Enum):
INSERTED = "inserted"
UPDATED = "updated"
SKIPPED = "skipped"
ERROR = "error"
@dataclass
class RowResult:
status: RowStatus
record_key: str
message: str = ""
class TaskRunner:
def process_records(self, task: Task, records: list[dict]) -> dict:
"""Process records with skip logic and error continuation."""
results = {
"inserted": 0,
"updated": 0,
"skipped": 0,
"errors": 0,
"error_details": []
}
for idx, record in enumerate(records):
try:
result = self._process_single_record(task, record)
results[result.status.value] += 1 if result.status != RowStatus.ERROR else 0
results["errors"] += 1 if result.status == RowStatus.ERROR else 0
if result.status == RowStatus.ERROR:
results["error_details"].append({
"row_index": idx,
"record_key": result.record_key,
"error": result.message
})
except Exception as e:
# Catch-all: log and continue to next record
logger.error(f"Unexpected error processing row {idx}: {e}")
results["errors"] += 1
results["error_details"].append({
"row_index": idx,
"error": str(e)
})
continue # NEVER stop the process
return results
def _process_single_record(self, task: Task, record: dict) -> RowResult:
"""Process a single record with skip and error handling."""
upsert_keys = json.loads(task.upsert_keys) if task.upsert_keys else []
record_key = self._get_record_key(record, upsert_keys)
try:
# Check if record exists
if upsert_keys:
existing = self._find_existing_record(task, record, upsert_keys)
if existing:
# Check skip condition
if self._should_skip(task, existing):
logger.info(f"Skipping row {record_key}: already processed")
return RowResult(
status=RowStatus.SKIPPED,
record_key=record_key,
message=f"Skip condition met: {task.skip_column}={task.skip_value}"
)
# Update existing record
self._update_record(task, existing, record)
return RowResult(status=RowStatus.UPDATED, record_key=record_key)
# Insert new record
self._insert_record(task, record)
return RowResult(status=RowStatus.INSERTED, record_key=record_key)
except IntegrityError as e:
# Primary key or unique constraint violation
logger.warning(f"Constraint error for row {record_key}: {e}")
self.session.rollback()
return RowResult(
status=RowStatus.ERROR,
record_key=record_key,
message=f"Constraint violation: {str(e)[:200]}"
)
except DatabaseError as e:
# Other database errors
logger.error(f"Database error for row {record_key}: {e}")
self.session.rollback()
return RowResult(
status=RowStatus.ERROR,
record_key=record_key,
message=f"Database error: {str(e)[:200]}"
)
def _should_skip(self, task: Task, existing_record) -> bool:
"""Check if record should be skipped based on skip_column/skip_value."""
if not task.skip_column or not task.skip_value:
return False
current_value = getattr(existing_record, task.skip_column, None)
return str(current_value).upper() == str(task.skip_value).upper()
def _get_record_key(self, record: dict, upsert_keys: list) -> str:
"""Generate a readable key for logging."""
if upsert_keys:
return ", ".join(f"{k}={record.get(k)}" for k in upsert_keys)
return f"row_{id(record)}"
def _build_merge_sql(self, task: Task, columns: list, upsert_keys: list) -> str:
"""Generate Oracle MERGE statement with skip condition."""
update_cols = [c for c in columns if c not in upsert_keys]
# Build WHERE clause for skip condition
skip_where = ""
if task.skip_column and task.skip_value:
skip_where = f"WHERE (t.{task.skip_column} IS NULL OR t.{task.skip_column} != '{task.skip_value}')"
return f"""
MERGE INTO {task.table_name} t
USING (SELECT {', '.join(f':{c} as {c}' for c in columns)} FROM dual) s
ON ({' AND '.join(f't.{k} = s.{k}' for k in upsert_keys)})
WHEN MATCHED THEN
UPDATE SET {', '.join(f't.{c} = s.{c}' for c in update_cols)}
{skip_where}
WHEN NOT MATCHED THEN
INSERT ({', '.join(columns)}) VALUES ({', '.join(f's.{c}' for c in columns)})
"""Key Principle: The process should NEVER stop due to individual row errors.
| Error Type | Action | Logged |
|---|---|---|
| Skip condition met | Skip row, continue | INFO |
| Primary key violation | Skip row, continue | WARNING |
| Unique constraint violation | Skip row, continue | WARNING |
| Data type mismatch | Skip row, continue | WARNING |
| Foreign key violation | Skip row, continue | WARNING |
| Connection lost | Retry 3x, then fail run | ERROR |
| Table not found | Fail run immediately | ERROR |
Error Log Entry Example:
{
"run_id": "abc-123",
"row_index": 42,
"record_key": "employee_id=12345",
"error_type": "CONSTRAINT_VIOLATION",
"error_message": "ORA-00001: unique constraint (EMPLOYEES_EMAIL_UK) violated",
"timestamp": "2026-02-03T14:30:00Z",
"action_taken": "SKIPPED"
}TaskWizard Step Enhancement:
┌─────────────────────────────────────────────────────────────────┐
│ Step 5: Database Options │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Insert Mode: │
│ ○ Insert only (fail on duplicates) │
│ ● Upsert (update if exists, insert if new) │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Unique Key Columns (for upsert matching): │ │
│ │ │ │
│ │ Available Columns: Selected Keys: │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ name │ [>] │ employee_id │ │ │
│ │ │ department │ [<] │ │ │ │
│ │ │ salary │ │ │ │ │
│ │ │ hire_date │ │ │ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Skip Already Processed Records (Optional): │ │
│ │ │ │
│ │ Skip Column: [ processed ▼ ] │ │
│ │ Skip Value: [ Y ] │ │
│ │ │ │
│ │ ℹ️ Rows where this column equals this value will be │ │
│ │ skipped during import (useful when third-party │ │
│ │ systems mark records as processed) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ☑️ Continue on row errors (log and skip failed rows) │
│ │
│ ⚠️ Upsert keys should match unique/primary key constraints │
│ │
└─────────────────────────────────────────────────────────────────┘
TaskDetail Enhancement:
- Show upsert configuration in task summary
- Display "Upsert Mode: ON (key: employee_id)" badge
- Display "Skip: processed=Y" badge when configured
- Track statistics: X inserted, Y updated, Z skipped, N errors
# TaskRun model additions
class TaskRun:
# Existing fields...
inserted_records = Column(Integer, default=0)
updated_records = Column(Integer, default=0)
skipped_records = Column(Integer, default=0) # Rows skipped due to skip condition
error_records = Column(Integer, default=0) # Rows skipped due to errors
# successful_records = inserted + updated
# total_processed = inserted + updated + skipped + errorsAPI Response:
{
"run_id": "abc-123",
"status": "SUCCESS",
"total_records": 100,
"inserted_records": 75,
"updated_records": 10,
"skipped_records": 12,
"error_records": 3,
"failed_records": 0
}| Feature | Estimated Effort | Priority |
|---|---|---|
| 1. DB Connection Config UI | 8-12 hours | High |
| 2. WebSocket Real-time | 10-15 hours | Medium |
| 3. Visual Cron Builder | 6-10 hours | Medium |
| 4. Mobile-Responsive UI | 8-12 hours | Medium |
| 5. Upsert Logic | 6-8 hours | High |
| Total | 38-57 hours |
- Upsert Logic - Core functionality, no UI dependencies
- DB Connection Config - Security-critical, enables multi-environment
- Mobile-Responsive UI - Improves usability across devices
- Visual Cron Builder - UX improvement, builds on existing scheduler
- WebSocket Real-time - Polish feature, can be added last
Backend Tests (40+ cases):
test_connections.py: CRUD, encryption, test connection, activation, file permissionstest_websocket.py: Connection lifecycle, event broadcasting, room subscriptionstest_upsert.py: MERGE generation, key matching, statistics trackingtest_skip_logic.py: Skip condition evaluation, error continuation, statistics- Test skip when processed='Y'
- Test continue on primary key error
- Test continue on constraint violation
- Test error logging with continuation
- Test mixed results (insert + update + skip + error)
Frontend Tests (30+ cases):
ConnectionEditor.test.tsx: Form validation, password masking, test buttonCronBuilder.test.tsx: Preset selection, cron generation, next runs displayResponsiveTable.test.tsx: Breakpoint switching, card renderingWebSocket.test.tsx: Connection, reconnection, event handlingUpsertConfig.test.tsx: Skip column selection, skip value input, validation
E2E Tests (15+ cases):
- Full connection configuration flow
- Real-time run monitoring
- Schedule creation with visual builder
- Mobile navigation and interactions
- Upsert with skip condition (verify skipped rows logged)
- Run with row errors (verify process continues)
- Connection passwords encrypted with Fernet
- Passwords never returned in API responses
- Rate limiting on connection test endpoint
- Audit logging for configuration changes
- WebSocket authentication (session-based)
- CORS configuration for WebSocket
- Input validation on cron expressions
- SQL injection prevention in MERGE statements (parameterized queries)
- FastAPI REST API fully implemented
- 8 service modules for business logic
- Database models and schemas
- Celery task queue setup
- APScheduler integration for cron scheduling
- 13 test files (7 unit + 6 integration)
- Ready for production
- React + TypeScript application
- 7 pages with full CRUD
- 11 routes configured
- UI components built with Radix UI
- 12 test files
- Production-ready codebase
- Nested JSON flattening display (tree view UI)
- API response sample fetching (manual paste + auto-fetch)
- Oracle metadata querying for column types
- Transform suggestions based on type mismatches
- Mapping templates (localStorage)
- Batch column operations (apply transform to all)
- REST API endpoints for mapping CRUD
- Enhanced TaskWizard with dedicated mapping step
- Advanced mapping management in TaskDetail page
- Scheduling feature integrated
- Bearer, API Key, Basic Auth support
- Credential encryption with Fernet
- Cron-based scheduling with APScheduler
- Schedule management UI (Schedules page)
- ScheduleEditor component
- Auto-pause on consecutive failures
- Manual resume for paused schedules
- Run labeling with task name and retry badges
- Frontend-backend field alignment
- Oracle 11g compatibility
- Timezone handling
- DB connection configuration UI
- Upsert logic for insert/update records
- Ant Design migration
- Local SQLite app state via
APP_DATABASE_URL - Destination DB separation so broken Oracle connectivity does not break the app shell
- Task-level destination connection selection with active-connection fallback
Performance Optimization:
- ✅ Batch upsert operations (200-300x faster)
- ✅ Bulk SELECT for existing record checks
- ✅ Bulk UPDATE with SQL CASE statements
- ✅ Bulk INSERT for new records
- ✅ Process 500 rows per batch instead of row-by-row
- ✅ Reduced 20,000 queries to 60 for 10,000 rows
Critical Bug Fixes:
- ✅ Oracle DATE insertion (TO_DATE wrapper for YYYY-MM-DD strings)
- ✅ Duplicate TaskRun creation (removed from API endpoint)
- ✅ NULL constraint violations in UPDATE (skip None values)
- ✅ Duration display in UI (added duration_seconds to all run endpoints)
- ✅ Upsert keys parsing (handle list/string/None types)
UI Enhancements:
- ✅ Upsert Settings tab in TaskDetail
- ✅ Column Mappings display fixed (shows existing mappings)
- ✅ Duration field fixes in RunDetail (correct field names)
Infrastructure:
- ✅ Configurable LOG_LEVEL environment variable
- ✅ Enhanced debug logging for date parsing, SQL generation, batch progress
See docs/code-health-final.md for the full report. Summary of changes:
- CI/CD: 7-job GitHub Actions workflow (lint, test, audit, build × backend + frontend)
- Pydantic V2:
task.py,task_run.py, andcolumn_mapping.pyschemas migrated toConfigDict TaskStatusmigrated toStrEnum(Python 3.11 idiomatic)oracle_routersplit for clean endpoint isolation incolumn_mappings.pyresponse_model=TaskRunOuton run endpoints (was untypeddict)- Validator bool-exclusion fixes for
int,float, andstringtype checks - Encryption:
APP_ENV == "dev-only"guard; key no longer logged; Fernet key pinned46.0.7 - OAuth fail-fast:
ValueErrorraised whenaccess_tokenabsent - Loguru tracebacks:
logger.opt(exception=exc).error()(wasexc_info=kwarg, silently ignored) - Docker healthcheck: Python
urllib.request(nocurldependency) - Test suite: all
TestNestedJsonFlatteningtests rewired to callflatten(); 409+ passing
- E2E testing with Cypress/Playwright
- OAuth provider integration (Google, GitHub, Azure AD)
- Advanced search & filtering
- Certificate-based authentication (mTLS)
- Per-connection PBKDF2 random salts
- Date: May 5, 2026
- Version: 1.3.0
- Status: Phase 1-10 Complete | Performance Optimization Complete | 409+ tests passing
- Next Focus: Additional destination adapters, UX refinement, per-connection PBKDF2 salts, batch operation monitoring dashboard
This document is the single source of truth for project context and development practices. Keep it updated as the project evolves.