diff --git a/.DS_Store b/.DS_Store
new file mode 100644
index 0000000..bfe7b37
Binary files /dev/null and b/.DS_Store differ
diff --git a/Dockerfile.allinone b/Dockerfile.allinone
index f12dd1a..6cb376b 100644
--- a/Dockerfile.allinone
+++ b/Dockerfile.allinone
@@ -255,14 +255,22 @@ ENV CELERY_RESULT_BACKEND=redis://localhost:6379/0
ENV PYTHONPATH=/app/backend
ENV NEXT_FRONTEND_URL=http://localhost:3000
ENV AUTH_TYPE=LOCAL
-ENV ETL_SERVICE=DOCLING
+ENV ETL_SERVICE=UNSTRUCTURED
ENV EMBEDDING_MODEL=sentence-transformers/all-MiniLM-L6-v2
+# Asyncio loop configuration (required for unstructured/uvicorn compatibility)
+ENV UVICORN_LOOP=asyncio
+ENV UNSTRUCTURED_HAS_PATCHED_LOOP=1
+
+# Disable LangSmith tracing by default (avoids SSL errors with corporate proxies)
+ENV LANGCHAIN_TRACING_V2=false
+ENV LANGSMITH_TRACING=false
+
# Frontend configuration (can be overridden at runtime)
# These are injected into the Next.js build at container startup
ENV NEXT_PUBLIC_FASTAPI_BACKEND_URL=http://localhost:8000
ENV NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE=LOCAL
-ENV NEXT_PUBLIC_ETL_SERVICE=DOCLING
+ENV NEXT_PUBLIC_ETL_SERVICE=UNSTRUCTURED
# Electric SQL configuration (ELECTRIC_DATABASE_URL is built dynamically by entrypoint from these values)
ENV ELECTRIC_DB_USER=electric
diff --git a/README.md b/README.md
index 6e69f8c..5d8fb2e 100644
--- a/README.md
+++ b/README.md
@@ -2,9 +2,11 @@
-**Your AI-Powered Personal Finance Assistant**
+
-An intelligent financial management platform that helps you track spending, optimize rewards, analyze investments, and make smarter money decisions using AI.
+### Your AI-Powered Personal CPA
+
+**Connect bank accounts • Upload tax forms • Get instant insights**
[](LICENSE)
[](https://www.typescriptlang.org/)
@@ -12,354 +14,179 @@ An intelligent financial management platform that helps you track spending, opti
[](https://fastapi.tiangolo.com/)
[](https://www.python.org/)
-[Features](#features) • [Quick Start](#quick-start) • [Architecture](#architecture) • [Documentation](#documentation) • [Contributing](#contributing)
+[Quick Start](#-quick-start) • [Features](#-features) • [Example Prompts](#-example-prompts) • [Privacy](#-privacy-first-design) • [Contributing](#-contributing)
---
-## 🌟 Features
+## 🚀 Quick Start
-### 🤖 AI-Powered Financial Advisor
-- **Smart Transaction Search**: Search your financial history using natural language queries
- - "How much did I spend on restaurants last year?"
- - "Show me all charges from United Airlines"
- - "Find my recurring subscriptions"
-- **Personalized Recommendations**: Get AI-driven suggestions for saving money and optimizing rewards
-- **Natural Language Queries**: Ask questions about your finances in plain English
-- **Predictive Analytics**: Forecast future spending and identify potential savings
+Choose your preferred setup method:
-### 💳 Smart Credit Card Optimization
-- **Rewards Analysis**: Analyze your spending patterns to find the best credit cards
- - "Which credit card should I use for restaurant purchases?"
- - "Am I using the right credit card for my spending?"
-- **Category-Based Optimization**: Get recommendations based on merchants and categories
-- **Multi-Card Strategy**: Optimize rewards across multiple credit cards
+### Option 1: All-in-One Docker (Easiest)
-### 📈 Investment Portfolio Management
-- **Real-Time Performance Tracking**: Monitor your investment returns with live market data
- - "How are my stocks performing today?"
- - "What's my portfolio return over the last year?"
- - "Show my month-over-month performance"
-- **Yahoo Finance Integration**: Fetches real-time and historical stock prices
-- **Cost Basis Tracking**: Calculate unrealized gains/losses across all holdings
-- **Time-Based Analysis**: Week-over-week, month-over-month, quarterly, and yearly performance
-
-### 🎯 Portfolio Allocation & Rebalancing
-- **Asset Allocation Analysis**: Understand your portfolio composition (stocks/bonds/cash)
- - "Is my portfolio allocation correct?"
- - "How should I rebalance according to Bogleheads philosophy?"
-- **Geographic Diversification**: Track US vs international exposure
-- **Investment Philosophy Comparison**: Compare against established strategies
- - Bogleheads Conservative (40/50/10)
- - Bogleheads Moderate (60/35/5)
- - Bogleheads Aggressive (90/10/0)
- - Three-Fund Portfolio
-- **Specific Rebalancing Recommendations**: Get dollar amounts for buying/selling
-- **Alignment Score**: See how well your portfolio matches your target allocation (0-100)
-
-### 💰 Tax Optimization
-- **Tax Loss Harvesting**: Identify opportunities to reduce your tax liability
- - "Can I harvest any tax losses?"
- - "What positions should I sell for tax savings?"
-- **Loss Identification**: Finds holdings with unrealized losses
-- **Tax Savings Calculator**: Estimates tax benefits based on your tax bracket
-- **Replacement Suggestions**: Recommends similar securities to avoid wash sales
-- **Wash Sale Warnings**: Alerts about IRS rules and compliance
-
-### 💳 Transaction & Spending Analysis
-- **Multi-Account Aggregation**: Connect bank accounts, credit cards, and investment platforms via Plaid
-- **Real-Time Tracking**: Monitor balances, transactions, and net worth in real-time
-- **Subscription Detection**: Identify and track recurring payments automatically
- - "Check if I have any recurring subscriptions"
- - "What am I paying monthly?"
-- **Category-Based Search**: Find transactions by category (restaurants, travel, etc.)
-- **Merchant Search**: Search by merchant name with fuzzy matching
-
-### 📊 Analytics & Reporting
-- **Interactive Dashboards**: Visualize spending trends, income, and investments
-- **Custom Reports**: Generate detailed financial reports and summaries
-- **Budget Management**: Set and monitor budgets with smart alerts
-- **Historical Comparisons**: Compare spending and performance across time periods
-
-### 🔒 Security & Privacy
-- **Bank-Level Encryption**: 256-bit SSL encryption for all data
-- **Secure Authentication**: OAuth 2.0 and Google Sign-In support
-- **Data Privacy**: Your financial data stays private and secure
-
-### 🔗 Integrations
-- **100+ Financial Institutions**: Banks, credit cards, investment platforms, crypto exchanges
-- **Plaid Integration**: Secure connection to financial accounts
-- **Yahoo Finance**: Real-time stock prices and historical market data
-- **Real-Time Sync**: Automatic transaction updates
-- **Manual Uploads**: Support for CSV files (bank statements, Fidelity positions, etc.)
-- **Export Options**: Download your data anytime
+```bash
+# Clone and run with a single container
+git clone https://github.com/yourusername/FinanceGPT.git
+cd FinanceGPT
+
+# Copy environment file and add your API keys
+cp .env.example .env
+# Edit .env with your OPENAI_API_KEY, PLAID_CLIENT_ID, PLAID_SECRET
+
+# Start FinanceGPT
+docker compose -f docker-compose.quickstart.yml up -d
+```
+
+🎉 **Open http://localhost:3000** — You're done!
---
-## 💬 Example Prompts
+### Option 2: Local Development (macOS)
-### Transaction Search & Analysis
-```
-"How much did I spend on restaurants last year?"
-"Show me all United Airlines charges"
-"Find transactions over $100 in the last month"
-"What did I spend on groceries this week?"
-"Show my Amazon purchases"
+```bash
+# Clone the repository
+git clone https://github.com/yourusername/FinanceGPT.git
+cd FinanceGPT
+
+# Start infrastructure (PostgreSQL, Redis)
+docker compose up -d db redis electric
+
+# Run the dev script (opens 3 terminal tabs automatically)
+chmod +x dev.sh
+./dev.sh
```
-### Credit Card Optimization
+This starts:
+- 🔧 **Backend API** on http://localhost:8000
+- 🔄 **Celery Worker** for background tasks
+- 🌐 **Frontend** on http://localhost:3000
+
+---
+
+### Option 3: Full Docker Stack
+
+```bash
+# Clone the repository
+git clone https://github.com/yourusername/FinanceGPT.git
+cd FinanceGPT
+
+# Configure environment
+cp financegpt_backend/.env.example financegpt_backend/.env
+# Edit .env with your API keys
+
+# Build and run all services
+docker compose up -d --build
+
+# View logs
+docker compose logs -f
```
-"Which credit card should I use for restaurants?"
-"Am I using the right credit card for gas purchases?"
-"What's the best card for my travel spending?"
-"Optimize my credit card usage"
+
+---
+
+## 🔐 Privacy-First Design
+
+Your financial data is sensitive. FinanceGPT is built with privacy as a core principle:
+
+| Feature | How It Protects You |
+|---------|---------------------|
+| **🔒 PII Masking** | SSN and EIN are **masked before any LLM call** (`123-45-6789` → `XXX-XX-XXXX`). Your tax forms never expose sensitive IDs. |
+| **🏠 Self-Hostable** | Run entirely on your own hardware. Your data never leaves your machine. |
+| **🤖 BYO Model** | Use your own LLM (OpenAI, Anthropic, or **local Ollama**). No vendor lock-in. |
+| **🔐 Local Processing** | Sensitive field extraction (SSN, EIN) happens locally—not via cloud APIs. |
+| **🗄️ Your Database** | All data stored in your PostgreSQL instance. Export or delete anytime. |
+| **🚫 No Telemetry** | Zero tracking, zero analytics, zero data collection. |
+
+```python
+# Example: How we handle your W2
+raw_text = "SSN: 123-45-6789, Wages: $183,000"
+masked_text = mask_pii_in_text(raw_text)
+# → "SSN: XXX-XX-XXXX, Wages: $183,000"
+# Only masked_text is sent to the LLM
```
-### Investment Performance
+---
+
+## 💬 Example Prompts
+
+Just ask questions in plain English. FinanceGPT understands context.
+
+### 💰 Income & Tax Questions
```
-"How are my stocks performing today?"
-"What's my portfolio return over the last year?"
-"Show my month-over-month investment performance"
-"How much have my investments grown this quarter?"
-"What's my total portfolio value?"
+"How much did I earn in 2024?"
+"What was my total federal tax withheld?"
+"Will I get a tax refund this year?"
+"Show me my W2 summary"
+"What state taxes did I pay?"
```
-### Portfolio Allocation & Rebalancing
+### 📊 Spending Analysis
```
-"Is my portfolio allocation correct?"
-"How should I rebalance according to Bogleheads philosophy?"
-"What's my exposure to international stocks?"
-"Am I too heavily invested in US stocks?"
-"Should I buy more bonds or stocks?"
-"Compare my portfolio to a three-fund strategy"
+"How much did I spend on restaurants last month?"
+"What are my recurring subscriptions?"
+"Find all Amazon purchases over $100"
+"What's my biggest expense category?"
+"Show spending trends for the last 3 months"
```
-### Tax Loss Harvesting
+### 💳 Credit Card Optimization
```
-"Can I harvest any tax losses?"
-"What stocks should I sell for tax losses?"
-"How much can I save in taxes by tax loss harvesting?"
-"Are there any positions with unrealized losses?"
-"Show me tax optimization opportunities"
+"Which card should I use for groceries?"
+"Am I using the right credit card for travel?"
+"How much rewards am I missing out on?"
+"Optimize my credit card usage"
```
-### Subscriptions & Recurring Payments
+### 📈 Investment Portfolio
```
-"Check if I have any recurring subscriptions"
-"What subscriptions am I paying for?"
-"Find all my monthly recurring charges"
-"Which services am I subscribed to?"
+"How are my stocks performing today?"
+"What's my portfolio return this year?"
+"Is my allocation correct for my age?"
+"Should I rebalance according to Bogleheads?"
+"Can I harvest any tax losses?"
```
-### Financial Planning
+### 🏦 Account Overview
```
"What's my net worth?"
-"How much am I saving each month?"
-"Show my spending trends over the last 3 months"
-"What's my biggest expense category?"
+"Show all my account balances"
+"How much do I have in savings?"
+"What's my monthly cash flow?"
```
---
-## �️ AI Tools & Capabilities
-
-FinanceGPT uses specialized AI tools to analyze your financial data and provide actionable insights:
-
-### 1. **Transaction Search** (`search_transactions`)
-- Searches through all your financial transactions using keywords and categories
-- Supports both Plaid-connected accounts and manual CSV uploads
-- Fuzzy merchant name matching for accurate results
-- Date range filtering and category-based filtering
-- Returns transaction summaries with totals and breakdowns
-
-### 2. **Credit Card Optimizer** (`optimize_credit_card_usage`)
-- Analyzes spending patterns to recommend optimal credit cards
-- Matches merchant categories to card rewards programs
-- Compares rewards rates across multiple cards
-- Provides specific recommendations per spending category
-- Supports both manual uploads and Plaid data
-
-### 3. **Portfolio Performance** (`calculate_portfolio_performance`)
-- Fetches real-time stock prices from Yahoo Finance
-- Calculates returns over custom time periods (day, week, month, quarter, year)
-- Compares current prices to historical prices for accurate performance
-- Shows individual holding performance and total portfolio returns
-- Supports both snapshot comparisons and live price lookups
-
-### 4. **Portfolio Allocation Analyzer** (`analyze_portfolio_allocation`)
-- Analyzes asset class distribution (stocks/bonds/cash)
-- Calculates geographic exposure (US vs international)
-- Compares portfolio to investment philosophies (Bogleheads, Three-Fund)
-- Provides specific rebalancing recommendations with dollar amounts
-- Gives alignment score (0-100) showing how close you are to target
-
-### 5. **Tax Loss Harvesting** (`analyze_tax_loss_harvesting`)
-- Identifies positions with unrealized losses
-- Calculates potential tax savings based on your tax bracket
-- Suggests replacement securities to avoid wash sales
-- Provides sell/buy recommendations for tax optimization
-- Warns about wash sale rules and compliance
-
-### 6. **Subscription Finder** (`find_subscriptions`)
-- Automatically detects recurring charges in your transaction history
-- Identifies monthly, quarterly, and annual subscriptions
-- Calculates total subscription costs
-- Helps you find forgotten or unused subscriptions
-
-### 7. **Knowledge Base Search** (`search_knowledge_base`)
-- Searches across all your uploaded financial documents
-- Vector-based semantic search for accurate results
-- Supports PDFs, CSVs, bank statements, and investment reports
-- Context-aware retrieval for answering complex questions
-
----
-
-## �🚀 Quick Start
-
-### Option 1: Quick Start (Recommended for Testing)
-
-The fastest way to try FinanceGPT:
-
-```bash
-# 1. Clone the repository
-git clone https://github.com/yourusername/FinanceGPT.git
-cd FinanceGPT
-
-# 2. Run the quick start script
-chmod +x start-financegpt.sh
-./start-financegpt.sh
-```
-
-This uses pre-built Docker images and requires minimal configuration. Access the app at **http://localhost:3000**
-
-### Option 2: Development Setup
-
-For development with full customization:
-
-#### Prerequisites
-
-- **Docker** and **Docker Compose** (required)
-- **Node.js** 18+ and **pnpm** (for frontend development)
-- **Python** 3.11+ (for backend development)
-
-#### Required Environment Variables
-
-Before starting, you **must** configure these essential variables:
-
-1. **Create backend environment file:**
- ```bash
- cp financegpt_backend/.env.example financegpt_backend/.env
- ```
-
-2. **Edit `financegpt_backend/.env` and set these required variables:**
-
- ```env
- # === REQUIRED: LLM API Key ===
- # You MUST configure at least one LLM provider for FinanceGPT to work
- # Get your API key from the respective provider:
-
- # OpenAI (recommended for best results)
- OPENAI_API_KEY=sk-...
-
- # OR Anthropic Claude
- ANTHROPIC_API_KEY=sk-ant-...
-
- # OR Google Gemini (free tier available)
- GOOGLE_API_KEY=AIza...
-
- # === REQUIRED: Plaid (for bank account connections) ===
- # Sign up at https://dashboard.plaid.com/team/keys
- PLAID_CLIENT_ID=your_plaid_client_id
- PLAID_SECRET=your_plaid_secret
- PLAID_ENV=sandbox # Use 'sandbox' for testing
-
- # === REQUIRED: Security ===
- SECRET_KEY=your-random-secret-key-change-this-in-production
-
- # === Optional: Document Processing ===
- # Only needed if you want to parse PDFs/documents
- # Get free API key from https://unstructured.io/
- # UNSTRUCTURED_API_KEY=your_key_here
- ```
-
-3. **Create frontend environment file:**
- ```bash
- cp financegpt_web/.env.example financegpt_web/.env.local
- ```
-
- Edit `financegpt_web/.env.local`:
- ```env
- # Backend API URL
- NEXT_PUBLIC_FASTAPI_BACKEND_URL=http://localhost:8000
-
- # Auth type (LOCAL or GOOGLE)
- NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE=LOCAL
-
- # Document parsing service
- NEXT_PUBLIC_ETL_SERVICE=DOCLING
- ```
-
-#### Installation Steps
-
-1. **Start infrastructure services (PostgreSQL, Redis, etc.)**
- ```bash
- docker-compose up -d db redis electric pgadmin
- ```
-
-2. **Install and run backend**
- ```bash
- cd financegpt_backend
-
- # Create virtual environment
- python -m venv .venv
- source .venv/bin/activate # On Windows: .venv\Scripts\activate
-
- # Install dependencies
- pip install -e .
-
- # Run database migrations
- alembic upgrade head
-
- # Start the backend server
- uvicorn main:app --reload --host 0.0.0.0 --port 8000
- ```
-
-3. **In a new terminal, install and run frontend**
- ```bash
- cd financegpt_web
-
- # Install dependencies
- pnpm install
-
- # Start development server
- pnpm dev
- ```
-
-4. **Access the application**
- - 🌐 **Frontend**: http://localhost:3000
- - 🔧 **Backend API**: http://localhost:8000
- - 📚 **API Documentation**: http://localhost:8000/docs
- - 🗄️ **pgAdmin** (Database UI): http://localhost:5050
- - Email: `admin@financegpt.com`
- - Password: `financegpt`
-
-### Option 3: Full Docker Deployment
-
-Build and run everything with Docker:
-
-```bash
-# Build and start all services
-docker-compose up -d
+## 🌟 Features
-# View logs
-docker-compose logs -f
+### 🤖 AI-Powered Financial Advisor
+- **Natural Language Queries**: Ask questions about your finances in plain English
+- **Smart Transaction Search**: "How much did I spend on restaurants last year?"
+- **Personalized Recommendations**: AI-driven suggestions for saving money
+- **Tax Form Analysis**: Upload W2s, 1099s and get instant summaries
-# Stop all services
-docker-compose down
-```
+### 💳 Smart Credit Card Optimization
+- **Rewards Maximization**: Get the best card for each purchase category
+- **Spending Pattern Analysis**: Identify where you're leaving money on the table
+- **Multi-Card Strategy**: Optimize rewards across all your cards
-Access at **http://localhost:3000**
+### 📈 Investment Portfolio Management
+- **Real-Time Performance**: Track returns with live Yahoo Finance data
+- **Time-Based Analysis**: WoW, MoM, QoQ, YoY performance tracking
+- **Tax Loss Harvesting**: Find opportunities to reduce your tax bill
+- **Rebalancing Recommendations**: Compare to Bogleheads, Three-Fund Portfolio
+
+### 📋 Tax Document Processing
+- **Supported Forms**: W2, 1099-INT, 1099-DIV, 1099-B, 1099-MISC, 1095-C
+- **LLM-Powered Extraction**: Accurate parsing with structured output
+- **Tax Estimate**: Calculate potential refund or amount owed
+- **State Tax Support**: Extracts state wages and withholdings
+
+### 💰 Transaction & Spending Analysis
+- **100+ Financial Institutions**: Connect via Plaid
+- **Subscription Detection**: Find forgotten recurring charges
+- **Category Analysis**: Understand where your money goes
+- **Historical Comparisons**: Compare spending across time periods
---
@@ -367,93 +194,68 @@ Access at **http://localhost:3000**
### Tech Stack
-#### Frontend (`financegpt_web/`)
-- **Framework**: Next.js 15 with App Router
-- **Language**: TypeScript
-- **Styling**: Tailwind CSS
-- **UI Components**: Shadcn/ui, Radix UI
-- **State Management**: Jotai
-- **Animations**: Framer Motion
-- **Real-Time**: ElectricSQL
-
-#### Backend (`financegpt_backend/`)
-- **Framework**: FastAPI
-- **Language**: Python 3.11+
-- **AI/ML**: LangChain, OpenAI GPT-4
-- **Database**: PostgreSQL with SQLAlchemy
-- **Task Queue**: Celery with Redis
-- **Financial Data**: Plaid API
-- **Authentication**: OAuth 2.0
-
-#### Infrastructure
-- **Database**: PostgreSQL 15
-- **Cache**: Redis
-- **Message Broker**: Redis (for Celery)
-- **Container**: Docker & Docker Compose
+| Layer | Technology | Purpose |
+|-------|------------|---------|
+| **Frontend** | Next.js 15, TypeScript, Tailwind | Modern web UI with server components |
+| **Backend** | FastAPI, Python 3.11+ | Async API with auto-generated docs |
+| **Database** | PostgreSQL + pgvector | Relational + vector search |
+| **Task Queue** | Celery + Redis | Background document processing |
+| **AI** | LiteLLM | Provider-agnostic (OpenAI, Anthropic, Ollama) |
+| **Banking** | Plaid API | 100+ financial institution connections |
+| **Auth** | Better Auth | OAuth 2.0, Google Sign-In |
### Project Structure
```
FinanceGPT/
-├── financegpt_web/ # Next.js frontend application
+├── financegpt_web/ # Next.js frontend
│ ├── app/ # App router pages
│ ├── components/ # React components
-│ ├── lib/ # Utilities and helpers
-│ └── public/ # Static assets
-├── financegpt_backend/ # FastAPI backend application
+│ └── lib/ # Utilities
+├── financegpt_backend/ # FastAPI backend
│ ├── app/
│ │ ├── agents/ # AI agents and tools
+│ │ ├── parsers/ # Tax form parsers
│ │ ├── routes/ # API endpoints
-│ │ ├── services/ # Business logic
-│ │ ├── tasks/ # Celery tasks
-│ │ └── utils/ # Utilities
+│ │ └── tasks/ # Celery tasks
│ └── alembic/ # Database migrations
-├── financegpt_browser_extension/ # Browser extension
-├── docker-compose.yml # Docker services configuration
-└── README.md # This file
+├── docker-compose.yml # Full stack deployment
+├── docker-compose.quickstart.yml # All-in-one container
+└── dev.sh # Local development script
```
---
-## 📖 Documentation
-
-### Configuration
+## 📖 Configuration
-#### Plaid API Setup
-1. Sign up for a [Plaid account](https://plaid.com/)
-2. Get your API keys (Client ID and Secret)
-3. Add to `financegpt_backend/.env`:
- ```env
- PLAID_CLIENT_ID=your_client_id
- PLAID_SECRET=your_secret
- PLAID_ENV=sandbox # or development/production
- ```
+### Required Environment Variables
-#### OpenAI API Setup
-1. Get your API key from [OpenAI](https://platform.openai.com/)
-2. Add to `financegpt_backend/.env`:
- ```env
- OPENAI_API_KEY=your_api_key
- ```
-
-#### Database Configuration
```env
-DATABASE_URL=postgresql://user:password@localhost:5432/financegpt
+# LLM Provider (choose one)
+OPENAI_API_KEY=sk-...
+# or ANTHROPIC_API_KEY=sk-ant-...
+# or GOOGLE_API_KEY=AIza...
+
+# Plaid (for bank connections)
+PLAID_CLIENT_ID=your_client_id
+PLAID_SECRET=your_secret
+PLAID_ENV=sandbox
+
+# Security
+SECRET_KEY=your-random-secret-key
```
-### API Endpoints
+### Optional Configuration
-#### Financial Data
-- `GET /api/accounts` - List all connected accounts
-- `GET /api/transactions` - Get transactions
-- `POST /api/plaid/link-token` - Create Plaid Link token
-- `POST /api/plaid/exchange-token` - Exchange public token
+```env
+# Document Processing
+UNSTRUCTURED_API_KEY=... # For PDF parsing
+ETL_SERVICE=DOCLING # Or UNSTRUCTURED
-#### AI Features
-- `POST /api/chat` - Chat with AI assistant
-- `POST /api/analyze/spending` - Analyze spending patterns
-- `POST /api/optimize/credit-card` - Get credit card recommendations
-- `GET /api/insights` - Get personalized insights
+# Voice Features
+TTS_SERVICE=local/kokoro
+STT_SERVICE=local/base
+```
---
@@ -461,42 +263,19 @@ DATABASE_URL=postgresql://user:password@localhost:5432/financegpt
### Running Tests
-**Frontend:**
-```bash
-cd financegpt_web
-pnpm test
-```
-
-**Backend:**
-```bash
-cd financegpt_backend
-pytest
-```
-
-### Code Quality
-
-**Frontend:**
```bash
-pnpm lint
-pnpm type-check
-```
+# Backend
+cd financegpt_backend && pytest
-**Backend:**
-```bash
-ruff check .
-mypy .
+# Frontend
+cd financegpt_web && pnpm test
```
### Database Migrations
-**Create a new migration:**
```bash
cd financegpt_backend
alembic revision --autogenerate -m "Description"
-```
-
-**Apply migrations:**
-```bash
alembic upgrade head
```
@@ -504,7 +283,7 @@ alembic upgrade head
## 🤝 Contributing
-We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.
+We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md).
1. Fork the repository
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
@@ -522,18 +301,16 @@ This project is licensed under the Apache License 2.0 - see the [LICENSE](LICENS
## 🙏 Acknowledgments
-- Built with [Next.js](https://nextjs.org/), [FastAPI](https://fastapi.tiangolo.com/), and [LangChain](https://langchain.com/)
+- Built on [SurfSense](https://github.com/MODSetter/SurfSense), an open-source NotebookLM alternative
- Financial data powered by [Plaid](https://plaid.com/)
-- AI capabilities powered by [OpenAI](https://openai.com/)
+- AI capabilities via [LiteLLM](https://github.com/BerriAI/litellm)
---
-## 📧 Contact
+
-For questions or support, please open an issue or contact us at support@financegpt.com
+Made with ❤️ for anyone who's ever stared at a W2 wondering what it all means.
----
+**[⬆ Back to Top](#financegpt-)**
-
-Made with ❤️ by the FinanceGPT Team
diff --git a/dev.sh b/dev.sh
new file mode 100755
index 0000000..5686066
--- /dev/null
+++ b/dev.sh
@@ -0,0 +1,69 @@
+#!/bin/bash
+# FinanceGPT Local Development Script
+# Runs all services in separate terminal tabs (macOS)
+
+set -e
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+BACKEND_DIR="$SCRIPT_DIR/financegpt_backend"
+WEB_DIR="$SCRIPT_DIR/financegpt_web"
+
+echo "🚀 Starting FinanceGPT development environment..."
+echo " Backend: $BACKEND_DIR"
+echo " Web: $WEB_DIR"
+
+# Check if running on macOS
+if [[ "$OSTYPE" != "darwin"* ]]; then
+ echo "❌ This script is designed for macOS. For Linux, use tmux or run manually."
+ echo ""
+ echo "Commands to run manually:"
+ echo " Terminal 1 (Backend API):"
+ echo " cd $BACKEND_DIR && source .venv/bin/activate && uv pip install -e . && python main.py --reload"
+ echo ""
+ echo " Terminal 2 (Celery Worker):"
+ echo " cd $BACKEND_DIR && source .venv/bin/activate && celery -A celery_worker.celery_app worker --loglevel=info --concurrency=1 --pool=solo"
+ echo ""
+ echo " Terminal 3 (Frontend):"
+ echo " cd $WEB_DIR && pnpm run dev"
+ exit 1
+fi
+
+# Create venv if it doesn't exist
+if [ ! -d "$BACKEND_DIR/.venv" ]; then
+ echo "📦 Creating virtual environment..."
+ cd "$BACKEND_DIR"
+ python3 -m venv .venv
+fi
+
+# Open Terminal tabs using AppleScript
+osascript < None:
+ """Change tax_forms.document_id FK from SET NULL to CASCADE."""
+ # Drop the existing foreign key constraint
+ op.drop_constraint('tax_forms_document_id_fkey', 'tax_forms', type_='foreignkey')
+
+ # Recreate with CASCADE on delete
+ op.create_foreign_key(
+ 'tax_forms_document_id_fkey',
+ 'tax_forms',
+ 'documents',
+ ['document_id'],
+ ['id'],
+ ondelete='CASCADE'
+ )
+
+
+def downgrade() -> None:
+ """Revert to SET NULL behavior."""
+ op.drop_constraint('tax_forms_document_id_fkey', 'tax_forms', type_='foreignkey')
+
+ op.create_foreign_key(
+ 'tax_forms_document_id_fkey',
+ 'tax_forms',
+ 'documents',
+ ['document_id'],
+ ['id'],
+ ondelete='SET NULL'
+ )
diff --git a/financegpt_backend/app/agents/new_chat/system_prompt.py b/financegpt_backend/app/agents/new_chat/system_prompt.py
index d96b625..a088979 100644
--- a/financegpt_backend/app/agents/new_chat/system_prompt.py
+++ b/financegpt_backend/app/agents/new_chat/system_prompt.py
@@ -519,14 +519,15 @@
11. analyze_tax_data: Query uploaded and processed tax forms to answer tax questions.
- **USE THIS TOOL** when users ask about:
* Tax form data: "How much did I earn?", "What were my wages?"
- * Tax withholdings: "How much federal tax was withheld?"
+ * Tax withholdings: "How much federal tax was withheld?", "State taxes withheld?"
* Interest income: "Did I have interest income?", "1099-INT summary?"
* Dividend income: "What dividends did I receive?"
* Capital gains: "Stock sale gains/losses?", "1099-B summary?"
- * W2 employment: "Where did I work?", "Wages by employer?"
+ * W2 employment: "Where did I work?", "Wages by employer?", "State wages?"
* Tax year summaries: "2024 tax summary", "Total income 2024"
+ * Tax estimates/refunds: "Will I get a refund?", "How much do I owe?", "Estimate my taxes"
- IMPORTANT: This tool queries ONLY uploaded tax forms (W2, 1099-MISC, 1099-INT, 1099-DIV, 1099-B)
- - Does NOT calculate estimates or current year projections - only historical data from uploaded forms
+ - W2 forms include state tax info (state code, state wages, state income tax withheld)
- Args:
- query_type: Type of analysis (required). Options:
* "income_summary": Total income across all sources
@@ -534,12 +535,13 @@
* "interest_income": Interest from 1099-INT forms
* "dividends_income": Dividends from 1099-DIV forms
* "capital_gains": Capital gains from 1099-B forms
- * "w2_summary": W2 employment wages and withholdings
+ * "w2_summary": W2 employment wages and withholdings (includes state tax info)
* "all_forms": List all uploaded tax forms
+ * "tax_estimate": Estimate federal tax liability and potential refund/amount owed
- tax_year: Specific tax year (e.g., 2024) or omit for all years
- form_types: Optional filter by form types (e.g., ["W2", "1099-INT"])
- Returns: Structured tax data with totals, breakdowns, and per-form details
- - Privacy: All PII (SSN, EIN) is hashed - never exposed in responses
+ - Privacy: All PII (SSN, EIN) is masked - never exposed in responses
@@ -645,14 +647,25 @@
- User: "Which companies did I work for in 2024?"
- Call: `analyze_tax_data(query_type="w2_summary", tax_year=2024)`
- - Returns: W2 forms with employers, wages, and withholdings
- - Summarize total wages and tax withheld
+ - Returns: W2 forms with employers, wages, withholdings, and state tax info
+ - Summarize total wages and federal/state tax withheld
- User: "List all my uploaded tax forms"
- Call: `analyze_tax_data(query_type="all_forms")`
- Returns: All tax forms with types, years, and processing status
- Note which forms need review (low confidence extractions)
+- User: "Will I get a tax refund?" or "How much do I owe?"
+ - Call: `analyze_tax_data(query_type="tax_estimate", tax_year=2024)`
+ - Returns: Estimated federal tax, total withheld, refund or amount owed
+ - Note: Simplified estimate using single filer status and standard deduction
+ - Always recommend consulting a tax professional for accuracy
+
+- User: "What state income tax did I pay?"
+ - Call: `analyze_tax_data(query_type="w2_summary", tax_year=2024)`
+ - Returns: W2 data including state_code, state_wages, and state_income_tax withheld
+ - Summarize state taxes by state if multiple W2s from different states
+
- User: "How much more am I spending this month compared to last month?"
- First call: `search_knowledge_base(query="transactions spending", start_date="2025-12-01", end_date="2025-12-31")` (Dec)
- Second call: `search_knowledge_base(query="transactions spending", start_date="2026-01-01", end_date="2026-01-26")` (Jan)
diff --git a/financegpt_backend/app/agents/new_chat/tools/tax_analysis.py b/financegpt_backend/app/agents/new_chat/tools/tax_analysis.py
index 0f4413b..3e58169 100644
--- a/financegpt_backend/app/agents/new_chat/tools/tax_analysis.py
+++ b/financegpt_backend/app/agents/new_chat/tools/tax_analysis.py
@@ -5,22 +5,47 @@
- "What was my total federal tax withheld?"
- "Did I have any interest income?"
- "What were my capital gains from stock sales?"
+- "Can you estimate my tax refund?"
"""
import logging
from datetime import datetime
from decimal import Decimal
from typing import Any
+from uuid import UUID
from langchain_core.tools import tool
from sqlalchemy import and_, desc, func, select
from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.orm import selectinload
-from app.schemas.tax_forms import TaxFormWithDetails
+from app.db import (
+ TaxForm,
+ W2Form,
+ Form1099Misc,
+ Form1099Int,
+ Form1099Div,
+ Form1099B,
+)
logger = logging.getLogger(__name__)
+# 2024 Federal Tax Brackets (Single Filers)
+TAX_BRACKETS_2024_SINGLE = [
+ (11600, Decimal("0.10")), # 10% up to $11,600
+ (47150, Decimal("0.12")), # 12% up to $47,150
+ (100525, Decimal("0.22")), # 22% up to $100,525
+ (191950, Decimal("0.24")), # 24% up to $191,950
+ (243725, Decimal("0.32")), # 32% up to $243,725
+ (609350, Decimal("0.35")), # 35% up to $609,350
+ (float('inf'), Decimal("0.37")), # 37% above $609,350
+]
+
+# Standard Deduction 2024
+STANDARD_DEDUCTION_2024 = Decimal("14600")
+
+
def create_tax_analysis_tool(user_id: str, search_space_id: int, db_session: AsyncSession):
"""Create the tax analysis tool for the agent.
@@ -42,12 +67,19 @@ async def analyze_tax_data(
"""Query uploaded tax forms to answer tax-related questions.
Use this tool when users ask about income, taxes withheld, interest, dividends,
- capital gains, or W2 employment information from their uploaded tax documents.
+ capital gains, W2 employment information, or tax estimates/refunds from their
+ uploaded tax documents.
Args:
- query_type: Type of tax analysis - "income_summary", "tax_summary",
- "interest_income", "dividends_income", "capital_gains",
- "w2_summary", or "all_forms"
+ query_type: Type of tax analysis:
+ - "income_summary": Total income across all sources
+ - "tax_summary": Total taxes withheld from all sources
+ - "interest_income": Interest income from 1099-INT forms
+ - "dividends_income": Dividend income from 1099-DIV forms
+ - "capital_gains": Capital gains from 1099-B forms
+ - "w2_summary": W2 wage and withholding summary
+ - "all_forms": List all tax forms
+ - "tax_estimate": Estimate tax liability and potential refund
tax_year: Specific tax year (e.g., 2024) or None for all years
form_types: Optional list of form types to filter (e.g., ["W2", "1099-INT"])
@@ -55,6 +87,7 @@ async def analyze_tax_data(
Dictionary with analysis results including totals, breakdowns, and details
"""
return await _analyze_tax_data_impl(
+ db_session=db_session,
user_id=user_id,
search_space_id=search_space_id,
query_type=query_type,
@@ -66,6 +99,7 @@ async def analyze_tax_data(
async def _analyze_tax_data_impl(
+ db_session: AsyncSession,
user_id: str,
search_space_id: int,
query_type: str,
@@ -73,150 +107,662 @@ async def _analyze_tax_data_impl(
form_types: list[str] | None = None,
) -> dict[str, Any]:
"""Implementation of tax data analysis."""
- # Note: Actual database queries would go here
- # For now, returning placeholder structure
if query_type == "income_summary":
- return await _get_income_summary(user_id, search_space_id, tax_year)
+ return await _get_income_summary(db_session, user_id, search_space_id, tax_year)
elif query_type == "tax_summary":
- return await _get_tax_summary(user_id, search_space_id, tax_year)
+ return await _get_tax_summary(db_session, user_id, search_space_id, tax_year)
elif query_type == "interest_income":
- return await _get_interest_income(user_id, search_space_id, tax_year)
+ return await _get_interest_income(db_session, user_id, search_space_id, tax_year)
elif query_type == "dividends_income":
- return await _get_dividends_income(user_id, search_space_id, tax_year)
+ return await _get_dividends_income(db_session, user_id, search_space_id, tax_year)
elif query_type == "capital_gains":
- return await _get_capital_gains(user_id, search_space_id, tax_year)
+ return await _get_capital_gains(db_session, user_id, search_space_id, tax_year)
elif query_type == "w2_summary":
- return await _get_w2_summary(user_id, search_space_id, tax_year)
+ return await _get_w2_summary(db_session, user_id, search_space_id, tax_year)
elif query_type == "all_forms":
- return await _get_all_forms(user_id, search_space_id, tax_year, form_types)
+ return await _get_all_forms(db_session, user_id, search_space_id, tax_year, form_types)
+ elif query_type == "tax_estimate":
+ return await _get_tax_estimate(db_session, user_id, search_space_id, tax_year)
else:
return {"error": f"Unknown query type: {query_type}"}
+async def _get_w2_summary(
+ db_session: AsyncSession,
+ user_id: str,
+ search_space_id: int,
+ tax_year: int | None,
+) -> dict[str, Any]:
+ """Get W2 summary with actual database queries."""
+ try:
+ # Build query for tax forms with W2 data
+ query = (
+ select(TaxForm)
+ .options(selectinload(TaxForm.w2_form))
+ .where(
+ and_(
+ TaxForm.user_id == UUID(user_id),
+ TaxForm.search_space_id == search_space_id,
+ TaxForm.form_type == "W2",
+ )
+ )
+ )
+
+ if tax_year:
+ query = query.where(TaxForm.tax_year == tax_year)
+
+ query = query.order_by(TaxForm.tax_year.desc())
+
+ result = await db_session.execute(query)
+ tax_forms = result.scalars().all()
+
+ if not tax_forms:
+ return {
+ "query_type": "w2_summary",
+ "tax_year": tax_year or "all years",
+ "has_data": False,
+ "message": "No W2 forms found. Upload your W2s to see employment income and withholdings.",
+ }
+
+ # Aggregate data
+ employers = []
+ total_wages = Decimal("0.00")
+ total_federal_withheld = Decimal("0.00")
+ total_ss_withheld = Decimal("0.00")
+ total_medicare_withheld = Decimal("0.00")
+ total_state_withheld = Decimal("0.00")
+
+ for form in tax_forms:
+ w2 = form.w2_form
+ if w2:
+ employer_info = {
+ "employer_name": w2.employer_name or "Unknown Employer",
+ "tax_year": form.tax_year,
+ "wages": float(w2.wages_tips_compensation or 0),
+ "federal_withheld": float(w2.federal_income_tax_withheld or 0),
+ "social_security_withheld": float(w2.social_security_tax_withheld or 0),
+ "medicare_withheld": float(w2.medicare_tax_withheld or 0),
+ "state_withheld": float(w2.state_income_tax or 0),
+ "retirement_plan": w2.retirement_plan,
+ "processing_status": form.processing_status,
+ "needs_review": form.needs_review,
+ }
+ employers.append(employer_info)
+
+ total_wages += w2.wages_tips_compensation or Decimal("0.00")
+ total_federal_withheld += w2.federal_income_tax_withheld or Decimal("0.00")
+ total_ss_withheld += w2.social_security_tax_withheld or Decimal("0.00")
+ total_medicare_withheld += w2.medicare_tax_withheld or Decimal("0.00")
+ total_state_withheld += w2.state_income_tax or Decimal("0.00")
+
+ return {
+ "query_type": "w2_summary",
+ "tax_year": tax_year or "all years",
+ "has_data": True,
+ "employers": employers,
+ "total_wages": float(total_wages),
+ "total_federal_withheld": float(total_federal_withheld),
+ "total_social_security_withheld": float(total_ss_withheld),
+ "total_medicare_withheld": float(total_medicare_withheld),
+ "total_state_withheld": float(total_state_withheld),
+ "total_w2_forms": len(employers),
+ }
+ except Exception as e:
+ logger.exception("Error getting W2 summary")
+ return {
+ "query_type": "w2_summary",
+ "error": str(e),
+ }
+
+
async def _get_income_summary(
+ db_session: AsyncSession,
user_id: str,
search_space_id: int,
tax_year: int | None,
) -> dict[str, Any]:
"""Get total income across all sources."""
- # TODO: Implement actual database queries
- # This would query W2s and 1099s to sum total income
- return {
- "query_type": "income_summary",
- "tax_year": tax_year or "all years",
- "total_w2_wages": Decimal("0.00"),
- "total_1099_misc_income": Decimal("0.00"),
- "total_interest_income": Decimal("0.00"),
- "total_dividend_income": Decimal("0.00"),
- "total_capital_gains": Decimal("0.00"),
- "grand_total_income": Decimal("0.00"),
- "message": "No tax forms uploaded yet. Please upload your W2 and 1099 forms to see income summary.",
- }
+ try:
+ w2_data = await _get_w2_summary(db_session, user_id, search_space_id, tax_year)
+ interest_data = await _get_interest_income(db_session, user_id, search_space_id, tax_year)
+ dividend_data = await _get_dividends_income(db_session, user_id, search_space_id, tax_year)
+ capital_gains_data = await _get_capital_gains(db_session, user_id, search_space_id, tax_year)
+ misc_data = await _get_misc_income(db_session, user_id, search_space_id, tax_year)
+
+ total_w2_wages = w2_data.get("total_wages", 0) if w2_data.get("has_data") else 0
+ total_interest = interest_data.get("total_interest", 0) if interest_data.get("has_data") else 0
+ total_dividends = dividend_data.get("total_ordinary_dividends", 0) if dividend_data.get("has_data") else 0
+ total_capital_gains = capital_gains_data.get("net_capital_gains", 0) if capital_gains_data.get("has_data") else 0
+ total_misc = misc_data.get("total_misc_income", 0) if misc_data.get("has_data") else 0
+
+ grand_total = total_w2_wages + total_interest + total_dividends + total_capital_gains + total_misc
+
+ has_any_data = any([
+ w2_data.get("has_data"),
+ interest_data.get("has_data"),
+ dividend_data.get("has_data"),
+ capital_gains_data.get("has_data"),
+ misc_data.get("has_data"),
+ ])
+
+ if not has_any_data:
+ return {
+ "query_type": "income_summary",
+ "tax_year": tax_year or "all years",
+ "has_data": False,
+ "message": "No tax forms uploaded yet. Please upload your W2 and 1099 forms to see income summary.",
+ }
+
+ return {
+ "query_type": "income_summary",
+ "tax_year": tax_year or "all years",
+ "has_data": True,
+ "total_w2_wages": total_w2_wages,
+ "total_interest_income": total_interest,
+ "total_dividend_income": total_dividends,
+ "total_capital_gains": total_capital_gains,
+ "total_misc_income": total_misc,
+ "grand_total_income": grand_total,
+ "breakdown": {
+ "w2": w2_data,
+ "interest": interest_data,
+ "dividends": dividend_data,
+ "capital_gains": capital_gains_data,
+ "misc": misc_data,
+ },
+ }
+ except Exception as e:
+ logger.exception("Error getting income summary")
+ return {"query_type": "income_summary", "error": str(e)}
async def _get_tax_summary(
+ db_session: AsyncSession,
user_id: str,
search_space_id: int,
tax_year: int | None,
) -> dict[str, Any]:
"""Get total taxes withheld across all sources."""
- # TODO: Implement actual database queries
- return {
- "query_type": "tax_summary",
- "tax_year": tax_year or "all years",
- "total_federal_withheld": Decimal("0.00"),
- "total_social_security_withheld": Decimal("0.00"),
- "total_medicare_withheld": Decimal("0.00"),
- "total_state_withheld": Decimal("0.00"),
- "grand_total_withheld": Decimal("0.00"),
- "message": "No tax forms uploaded yet. Please upload your W2 and 1099 forms to see tax withholdings.",
- }
+ try:
+ w2_data = await _get_w2_summary(db_session, user_id, search_space_id, tax_year)
+ interest_data = await _get_interest_income(db_session, user_id, search_space_id, tax_year)
+ dividend_data = await _get_dividends_income(db_session, user_id, search_space_id, tax_year)
+
+ total_federal = w2_data.get("total_federal_withheld", 0) if w2_data.get("has_data") else 0
+ total_federal += interest_data.get("total_federal_withheld", 0) if interest_data.get("has_data") else 0
+ total_federal += dividend_data.get("total_federal_withheld", 0) if dividend_data.get("has_data") else 0
+
+ total_ss = w2_data.get("total_social_security_withheld", 0) if w2_data.get("has_data") else 0
+ total_medicare = w2_data.get("total_medicare_withheld", 0) if w2_data.get("has_data") else 0
+ total_state = w2_data.get("total_state_withheld", 0) if w2_data.get("has_data") else 0
+
+ has_data = w2_data.get("has_data") or interest_data.get("has_data") or dividend_data.get("has_data")
+
+ if not has_data:
+ return {
+ "query_type": "tax_summary",
+ "tax_year": tax_year or "all years",
+ "has_data": False,
+ "message": "No tax forms uploaded yet. Please upload your W2 and 1099 forms to see tax withholdings.",
+ }
+
+ return {
+ "query_type": "tax_summary",
+ "tax_year": tax_year or "all years",
+ "has_data": True,
+ "total_federal_withheld": total_federal,
+ "total_social_security_withheld": total_ss,
+ "total_medicare_withheld": total_medicare,
+ "total_state_withheld": total_state,
+ "grand_total_withheld": total_federal + total_ss + total_medicare + total_state,
+ }
+ except Exception as e:
+ logger.exception("Error getting tax summary")
+ return {"query_type": "tax_summary", "error": str(e)}
async def _get_interest_income(
+ db_session: AsyncSession,
user_id: str,
search_space_id: int,
tax_year: int | None,
) -> dict[str, Any]:
"""Get interest income from 1099-INT forms."""
- # TODO: Implement actual database queries
- return {
- "query_type": "interest_income",
- "tax_year": tax_year or "all years",
- "total_interest": Decimal("0.00"),
- "sources": [],
- "message": "No 1099-INT forms found. Upload your interest income statements to see details.",
- }
+ try:
+ query = (
+ select(TaxForm)
+ .options(selectinload(TaxForm.form_1099_int))
+ .where(
+ and_(
+ TaxForm.user_id == UUID(user_id),
+ TaxForm.search_space_id == search_space_id,
+ TaxForm.form_type == "1099-INT",
+ )
+ )
+ )
+
+ if tax_year:
+ query = query.where(TaxForm.tax_year == tax_year)
+
+ result = await db_session.execute(query)
+ tax_forms = result.scalars().all()
+
+ if not tax_forms:
+ return {
+ "query_type": "interest_income",
+ "tax_year": tax_year or "all years",
+ "has_data": False,
+ "message": "No 1099-INT forms found. Upload your interest income statements to see details.",
+ }
+
+ sources = []
+ total_interest = Decimal("0.00")
+ total_federal_withheld = Decimal("0.00")
+
+ for form in tax_forms:
+ int_form = form.form_1099_int
+ if int_form:
+ source_info = {
+ "payer_name": int_form.payer_name or "Unknown Payer",
+ "tax_year": form.tax_year,
+ "interest_income": float(int_form.interest_income or 0),
+ "federal_withheld": float(int_form.federal_income_tax_withheld or 0),
+ }
+ sources.append(source_info)
+
+ total_interest += int_form.interest_income or Decimal("0.00")
+ total_federal_withheld += int_form.federal_income_tax_withheld or Decimal("0.00")
+
+ return {
+ "query_type": "interest_income",
+ "tax_year": tax_year or "all years",
+ "has_data": True,
+ "total_interest": float(total_interest),
+ "total_federal_withheld": float(total_federal_withheld),
+ "sources": sources,
+ }
+ except Exception as e:
+ logger.exception("Error getting interest income")
+ return {"query_type": "interest_income", "error": str(e)}
async def _get_dividends_income(
+ db_session: AsyncSession,
user_id: str,
search_space_id: int,
tax_year: int | None,
) -> dict[str, Any]:
"""Get dividend income from 1099-DIV forms."""
- # TODO: Implement actual database queries
- return {
- "query_type": "dividends_income",
- "tax_year": tax_year or "all years",
- "total_ordinary_dividends": Decimal("0.00"),
- "total_qualified_dividends": Decimal("0.00"),
- "sources": [],
- "message": "No 1099-DIV forms found. Upload your dividend income statements to see details.",
- }
+ try:
+ query = (
+ select(TaxForm)
+ .options(selectinload(TaxForm.form_1099_div))
+ .where(
+ and_(
+ TaxForm.user_id == UUID(user_id),
+ TaxForm.search_space_id == search_space_id,
+ TaxForm.form_type == "1099-DIV",
+ )
+ )
+ )
+
+ if tax_year:
+ query = query.where(TaxForm.tax_year == tax_year)
+
+ result = await db_session.execute(query)
+ tax_forms = result.scalars().all()
+
+ if not tax_forms:
+ return {
+ "query_type": "dividends_income",
+ "tax_year": tax_year or "all years",
+ "has_data": False,
+ "message": "No 1099-DIV forms found. Upload your dividend income statements to see details.",
+ }
+
+ sources = []
+ total_ordinary = Decimal("0.00")
+ total_qualified = Decimal("0.00")
+ total_capital_gain_dist = Decimal("0.00")
+ total_federal_withheld = Decimal("0.00")
+
+ for form in tax_forms:
+ div_form = form.form_1099_div
+ if div_form:
+ source_info = {
+ "payer_name": div_form.payer_name or "Unknown Payer",
+ "tax_year": form.tax_year,
+ "ordinary_dividends": float(div_form.total_ordinary_dividends or 0),
+ "qualified_dividends": float(div_form.qualified_dividends or 0),
+ "capital_gain_distributions": float(div_form.total_capital_gain_distributions or 0),
+ "federal_withheld": float(div_form.federal_income_tax_withheld or 0),
+ }
+ sources.append(source_info)
+
+ total_ordinary += div_form.total_ordinary_dividends or Decimal("0.00")
+ total_qualified += div_form.qualified_dividends or Decimal("0.00")
+ total_capital_gain_dist += div_form.total_capital_gain_distributions or Decimal("0.00")
+ total_federal_withheld += div_form.federal_income_tax_withheld or Decimal("0.00")
+
+ return {
+ "query_type": "dividends_income",
+ "tax_year": tax_year or "all years",
+ "has_data": True,
+ "total_ordinary_dividends": float(total_ordinary),
+ "total_qualified_dividends": float(total_qualified),
+ "total_capital_gain_distributions": float(total_capital_gain_dist),
+ "total_federal_withheld": float(total_federal_withheld),
+ "sources": sources,
+ }
+ except Exception as e:
+ logger.exception("Error getting dividends income")
+ return {"query_type": "dividends_income", "error": str(e)}
async def _get_capital_gains(
+ db_session: AsyncSession,
user_id: str,
search_space_id: int,
tax_year: int | None,
) -> dict[str, Any]:
"""Get capital gains from 1099-B forms."""
- # TODO: Implement actual database queries
- return {
- "query_type": "capital_gains",
- "tax_year": tax_year or "all years",
- "total_short_term_gains": Decimal("0.00"),
- "total_long_term_gains": Decimal("0.00"),
- "total_realized_gains": Decimal("0.00"),
- "transactions": [],
- "message": "No 1099-B forms found. Upload your brokerage statements to see capital gains.",
- }
+ try:
+ query = (
+ select(TaxForm)
+ .options(selectinload(TaxForm.form_1099_b))
+ .where(
+ and_(
+ TaxForm.user_id == UUID(user_id),
+ TaxForm.search_space_id == search_space_id,
+ TaxForm.form_type == "1099-B",
+ )
+ )
+ )
+
+ if tax_year:
+ query = query.where(TaxForm.tax_year == tax_year)
+
+ result = await db_session.execute(query)
+ tax_forms = result.scalars().all()
+
+ if not tax_forms:
+ return {
+ "query_type": "capital_gains",
+ "tax_year": tax_year or "all years",
+ "has_data": False,
+ "message": "No 1099-B forms found. Upload your brokerage statements to see capital gains.",
+ }
+
+ sources = []
+ total_proceeds = Decimal("0.00")
+ total_cost_basis = Decimal("0.00")
+
+ for form in tax_forms:
+ b_form = form.form_1099_b
+ if b_form:
+ proceeds = b_form.proceeds or Decimal("0.00")
+ cost_basis = b_form.cost_basis or Decimal("0.00")
+ gain_loss = proceeds - cost_basis
+
+ source_info = {
+ "broker_name": b_form.payer_name or "Unknown Broker",
+ "tax_year": form.tax_year,
+ "proceeds": float(proceeds),
+ "cost_basis": float(cost_basis),
+ "gain_loss": float(gain_loss),
+ }
+ sources.append(source_info)
+
+ total_proceeds += proceeds
+ total_cost_basis += cost_basis
+
+ net_gains = total_proceeds - total_cost_basis
+
+ return {
+ "query_type": "capital_gains",
+ "tax_year": tax_year or "all years",
+ "has_data": True,
+ "total_proceeds": float(total_proceeds),
+ "total_cost_basis": float(total_cost_basis),
+ "net_capital_gains": float(net_gains),
+ "sources": sources,
+ }
+ except Exception as e:
+ logger.exception("Error getting capital gains")
+ return {"query_type": "capital_gains", "error": str(e)}
-async def _get_w2_summary(
+async def _get_misc_income(
+ db_session: AsyncSession,
user_id: str,
search_space_id: int,
tax_year: int | None,
) -> dict[str, Any]:
- """Get W2 summary."""
- # TODO: Implement actual database queries
- return {
- "query_type": "w2_summary",
- "tax_year": tax_year or "all years",
- "employers": [],
- "total_wages": Decimal("0.00"),
- "total_federal_withheld": Decimal("0.00"),
- "total_social_security_withheld": Decimal("0.00"),
- "total_medicare_withheld": Decimal("0.00"),
- "message": "No W2 forms found. Upload your W2s to see employment income and withholdings.",
- }
+ """Get miscellaneous income from 1099-MISC forms."""
+ try:
+ query = (
+ select(TaxForm)
+ .options(selectinload(TaxForm.form_1099_misc))
+ .where(
+ and_(
+ TaxForm.user_id == UUID(user_id),
+ TaxForm.search_space_id == search_space_id,
+ TaxForm.form_type == "1099-MISC",
+ )
+ )
+ )
+
+ if tax_year:
+ query = query.where(TaxForm.tax_year == tax_year)
+
+ result = await db_session.execute(query)
+ tax_forms = result.scalars().all()
+
+ if not tax_forms:
+ return {
+ "query_type": "misc_income",
+ "tax_year": tax_year or "all years",
+ "has_data": False,
+ }
+
+ sources = []
+ total_misc = Decimal("0.00")
+
+ for form in tax_forms:
+ misc_form = form.form_1099_misc
+ if misc_form:
+ misc_income = (
+ (misc_form.rents or Decimal("0.00")) +
+ (misc_form.royalties or Decimal("0.00")) +
+ (misc_form.other_income or Decimal("0.00"))
+ )
+
+ source_info = {
+ "payer_name": misc_form.payer_name or "Unknown Payer",
+ "tax_year": form.tax_year,
+ "rents": float(misc_form.rents or 0),
+ "royalties": float(misc_form.royalties or 0),
+ "other_income": float(misc_form.other_income or 0),
+ "total": float(misc_income),
+ }
+ sources.append(source_info)
+ total_misc += misc_income
+
+ return {
+ "query_type": "misc_income",
+ "tax_year": tax_year or "all years",
+ "has_data": True,
+ "total_misc_income": float(total_misc),
+ "sources": sources,
+ }
+ except Exception as e:
+ logger.exception("Error getting misc income")
+ return {"query_type": "misc_income", "error": str(e)}
async def _get_all_forms(
+ db_session: AsyncSession,
user_id: str,
search_space_id: int,
tax_year: int | None,
form_types: list[str] | None,
) -> dict[str, Any]:
"""Get all tax forms with optional filters."""
- # TODO: Implement actual database queries
- return {
- "query_type": "all_forms",
- "tax_year": tax_year or "all years",
- "form_types_filter": form_types,
- "forms": [],
- "total_forms": 0,
- "message": "No tax forms uploaded yet. Upload W2s and 1099s to get started.",
- }
+ try:
+ query = (
+ select(TaxForm)
+ .where(
+ and_(
+ TaxForm.user_id == UUID(user_id),
+ TaxForm.search_space_id == search_space_id,
+ )
+ )
+ .order_by(TaxForm.tax_year.desc(), TaxForm.form_type)
+ )
+
+ if tax_year:
+ query = query.where(TaxForm.tax_year == tax_year)
+
+ if form_types:
+ query = query.where(TaxForm.form_type.in_(form_types))
+
+ result = await db_session.execute(query)
+ tax_forms = result.scalars().all()
+
+ if not tax_forms:
+ return {
+ "query_type": "all_forms",
+ "tax_year": tax_year or "all years",
+ "form_types_filter": form_types,
+ "has_data": False,
+ "total_forms": 0,
+ "message": "No tax forms uploaded yet. Upload W2s and 1099s to get started.",
+ }
+
+ forms_list = [
+ {
+ "id": str(form.id),
+ "form_type": form.form_type,
+ "tax_year": form.tax_year,
+ "processing_status": form.processing_status,
+ "extraction_method": form.extraction_method,
+ "needs_review": form.needs_review,
+ "uploaded_at": form.uploaded_at.isoformat() if form.uploaded_at else None,
+ }
+ for form in tax_forms
+ ]
+
+ return {
+ "query_type": "all_forms",
+ "tax_year": tax_year or "all years",
+ "form_types_filter": form_types,
+ "has_data": True,
+ "forms": forms_list,
+ "total_forms": len(forms_list),
+ }
+ except Exception as e:
+ logger.exception("Error getting all forms")
+ return {"query_type": "all_forms", "error": str(e)}
+
+
+async def _get_tax_estimate(
+ db_session: AsyncSession,
+ user_id: str,
+ search_space_id: int,
+ tax_year: int | None,
+) -> dict[str, Any]:
+ """Estimate tax liability and potential refund.
+
+ This provides a simplified estimate based on single filer status.
+ For accurate tax calculations, users should consult a tax professional.
+ """
+ try:
+ # Get income summary first
+ income_data = await _get_income_summary(db_session, user_id, search_space_id, tax_year)
+
+ if not income_data.get("has_data"):
+ return {
+ "query_type": "tax_estimate",
+ "tax_year": tax_year or "all years",
+ "has_data": False,
+ "message": "No tax forms uploaded yet. Please upload your W2 and 1099 forms to get a tax estimate.",
+ }
+
+ # Get withholdings
+ tax_data = await _get_tax_summary(db_session, user_id, search_space_id, tax_year)
+
+ # Calculate estimated tax
+ gross_income = Decimal(str(income_data.get("grand_total_income", 0)))
+
+ # Apply standard deduction
+ taxable_income = max(Decimal("0.00"), gross_income - STANDARD_DEDUCTION_2024)
+
+ # Calculate tax using brackets
+ estimated_tax = _calculate_tax_from_brackets(taxable_income)
+
+ # Get total withholdings
+ total_withheld = Decimal(str(tax_data.get("total_federal_withheld", 0)))
+
+ # Calculate refund/owed
+ difference = total_withheld - estimated_tax
+
+ if difference > 0:
+ refund_or_owed = "refund"
+ amount = float(difference)
+ else:
+ refund_or_owed = "owed"
+ amount = float(abs(difference))
+
+ return {
+ "query_type": "tax_estimate",
+ "tax_year": tax_year or 2024,
+ "has_data": True,
+ "disclaimer": "This is a simplified estimate assuming single filer status with standard deduction. For accurate calculations, please consult a tax professional.",
+ "gross_income": float(gross_income),
+ "standard_deduction": float(STANDARD_DEDUCTION_2024),
+ "taxable_income": float(taxable_income),
+ "estimated_federal_tax": float(estimated_tax),
+ "total_federal_withheld": float(total_withheld),
+ "estimate_result": refund_or_owed,
+ "estimate_amount": amount,
+ "breakdown": {
+ "income": {
+ "w2_wages": income_data.get("total_w2_wages", 0),
+ "interest": income_data.get("total_interest_income", 0),
+ "dividends": income_data.get("total_dividend_income", 0),
+ "capital_gains": income_data.get("total_capital_gains", 0),
+ "misc_income": income_data.get("total_misc_income", 0),
+ },
+ "withholdings": {
+ "federal": tax_data.get("total_federal_withheld", 0),
+ "social_security": tax_data.get("total_social_security_withheld", 0),
+ "medicare": tax_data.get("total_medicare_withheld", 0),
+ "state": tax_data.get("total_state_withheld", 0),
+ },
+ },
+ }
+ except Exception as e:
+ logger.exception("Error calculating tax estimate")
+ return {"query_type": "tax_estimate", "error": str(e)}
+
+
+def _calculate_tax_from_brackets(taxable_income: Decimal) -> Decimal:
+ """Calculate federal tax using progressive brackets."""
+ if taxable_income <= 0:
+ return Decimal("0.00")
+
+ tax = Decimal("0.00")
+ prev_bracket = Decimal("0")
+
+ for bracket_max, rate in TAX_BRACKETS_2024_SINGLE:
+ bracket_max_decimal = Decimal(str(bracket_max))
+
+ if taxable_income <= bracket_max_decimal:
+ tax += (taxable_income - prev_bracket) * rate
+ break
+ else:
+ tax += (bracket_max_decimal - prev_bracket) * rate
+ prev_bracket = bracket_max_decimal
+
+ return tax.quantize(Decimal("0.01"))
diff --git a/financegpt_backend/app/celery_app.py b/financegpt_backend/app/celery_app.py
index 36b0b64..f711d13 100644
--- a/financegpt_backend/app/celery_app.py
+++ b/financegpt_backend/app/celery_app.py
@@ -64,6 +64,7 @@ def parse_schedule_interval(interval: str) -> dict:
"app.tasks.celery_tasks.schedule_checker_task",
"app.tasks.celery_tasks.blocknote_migration_tasks",
"app.tasks.celery_tasks.document_reindex_tasks",
+ "app.tasks.celery_tasks.tax_form_tasks",
],
)
diff --git a/financegpt_backend/app/db.py b/financegpt_backend/app/db.py
index 3932bf1..5688f05 100644
--- a/financegpt_backend/app/db.py
+++ b/financegpt_backend/app/db.py
@@ -981,7 +981,7 @@ class TaxForm(BaseModel):
form_type = Column(String(20), nullable=False, index=True) # W2, 1099-MISC, 1099-INT, etc.
tax_year = Column(Integer, nullable=False, index=True)
document_id = Column(
- Integer, ForeignKey("documents.id", ondelete="SET NULL"), nullable=True
+ Integer, ForeignKey("documents.id", ondelete="CASCADE"), nullable=True
)
uploaded_at = Column(TIMESTAMP(timezone=True), server_default=text('now()'), nullable=False)
processed_at = Column(TIMESTAMP(timezone=True), nullable=True)
diff --git a/financegpt_backend/app/parsers/llm_csv_parser.py b/financegpt_backend/app/parsers/llm_csv_parser.py
index 563752d..6136c8d 100644
--- a/financegpt_backend/app/parsers/llm_csv_parser.py
+++ b/financegpt_backend/app/parsers/llm_csv_parser.py
@@ -32,6 +32,80 @@ def __init__(self):
"""Initialize LLM CSV parser."""
super().__init__("LLM CSV Parser")
+ def _skip_preamble_rows(self, text_content: str) -> str:
+ """
+ Skip preamble/header rows that appear before actual CSV data.
+
+ Many brokerage CSV exports include metadata rows before the actual CSV headers.
+ Examples:
+ - Schwab: "Positions for account ****9247 as of 01/29/2026 04:00 PM ET"
+ - Fidelity: "Brokerage" or account name rows
+ - E*TRADE: Account summary rows
+
+ Detection strategy:
+ 1. Find the first line that looks like a proper CSV header row
+ 2. A header row has multiple comma-separated fields (>=3)
+ 3. Subsequent data rows should have the same number of fields
+ 4. Preamble rows typically have fewer fields or are single values
+
+ Args:
+ text_content: Raw CSV text content
+
+ Returns:
+ CSV text with preamble rows removed
+ """
+ lines = text_content.strip().split('\n')
+ if not lines:
+ return text_content
+
+ def count_csv_fields(line: str) -> int:
+ """Count fields in a CSV line, handling quoted values with commas."""
+ try:
+ # Use csv module to properly parse the line
+ reader = csv.reader(io.StringIO(line))
+ fields = next(reader, [])
+ return len(fields)
+ except Exception:
+ # Fallback: simple comma count (less accurate but safe)
+ return line.count(',') + 1
+
+ def is_likely_header_row(line: str, next_lines: list[str]) -> bool:
+ """Check if this line looks like a CSV header row."""
+ if not line.strip():
+ return False
+
+ field_count = count_csv_fields(line)
+
+ # Headers should have at least 3 fields
+ if field_count < 3:
+ return False
+
+ # Check if the next few non-empty lines have the same field count
+ # This indicates we found the actual CSV structure
+ matching_lines = 0
+ for next_line in next_lines[:5]: # Check up to 5 subsequent lines
+ if not next_line.strip():
+ continue
+ if count_csv_fields(next_line) == field_count:
+ matching_lines += 1
+
+ # If at least 2 subsequent lines match, this is likely the header
+ return matching_lines >= 2
+
+ # Find the first line that looks like a CSV header
+ for i, line in enumerate(lines):
+ if not line.strip():
+ continue
+
+ remaining_lines = lines[i+1:]
+ if is_likely_header_row(line, remaining_lines):
+ if i > 0:
+ logger.info(f"Skipped {i} preamble row(s) before CSV headers")
+ return '\n'.join(lines[i:])
+
+ # No preamble detected, return original content
+ return text_content
+
async def parse_file(
self,
file_content: bytes,
@@ -56,6 +130,11 @@ async def parse_file(
try:
# Decode CSV
text_content = file_content.decode("utf-8-sig")
+
+ # Handle CSV files with preamble/header rows before actual CSV data
+ # (e.g., Schwab files start with "Positions for account ****XXXX as of...")
+ text_content = self._skip_preamble_rows(text_content)
+
csv_reader = csv.DictReader(io.StringIO(text_content))
# Get headers and first few rows for LLM analysis
@@ -316,14 +395,18 @@ def safe_decimal(value: str | None, allow_negative: bool = False) -> Decimal | N
holdings = []
+ # Log schema for debugging
+ logger.debug(f"Applying holdings schema: {schema}")
+
for row in rows:
try:
# Extract symbol
symbol_col = schema.get("symbol", {}).get("column")
if not symbol_col or symbol_col not in row:
+ logger.debug(f"Symbol column '{symbol_col}' not found in row keys: {list(row.keys())}")
continue
symbol = str(row[symbol_col]).strip().upper()
- if not symbol or symbol in ["", "N/A", "Total", "TOTAL"]:
+ if not symbol or symbol in ["", "N/A", "Total", "TOTAL", "ACCOUNT TOTAL", "CASH & CASH INVESTMENTS"]:
continue
# Extract quantity using safe_decimal
diff --git a/financegpt_backend/app/parsers/tax_form_parser.py b/financegpt_backend/app/parsers/tax_form_parser.py
index 57f9fcd..e3a4bde 100644
--- a/financegpt_backend/app/parsers/tax_form_parser.py
+++ b/financegpt_backend/app/parsers/tax_form_parser.py
@@ -9,20 +9,76 @@
Each tier returns confidence scores. If confidence < 0.85, escalate to next tier.
"""
+import decimal
+import json
import logging
import re
from decimal import Decimal
from pathlib import Path
from typing import Any, Literal
+import litellm
import pdfplumber
+from pydantic import BaseModel, Field
from unstructured.partition.pdf import partition_pdf
-from app.utils.pii_masking import mask_tax_form_for_llm, validate_confidence_threshold
+from app.utils.pii_masking import mask_tax_form_for_llm, validate_confidence_threshold, mask_pii_in_text, recover_pii_from_mapping
logger = logging.getLogger(__name__)
+# Pydantic models for structured LLM extraction
+class W2ExtractedData(BaseModel):
+ """Structured W2 data extracted by LLM."""
+ wages_tips_compensation: float | None = Field(None, description="Box 1: Wages, tips, other compensation")
+ federal_income_tax_withheld: float | None = Field(None, description="Box 2: Federal income tax withheld")
+ social_security_wages: float | None = Field(None, description="Box 3: Social security wages")
+ social_security_tax_withheld: float | None = Field(None, description="Box 4: Social security tax withheld")
+ medicare_wages: float | None = Field(None, description="Box 5: Medicare wages and tips")
+ medicare_tax_withheld: float | None = Field(None, description="Box 6: Medicare tax withheld")
+ # State tax fields
+ state_code: str | None = Field(None, description="Box 15: State code (e.g., CA, NY)")
+ state_wages: float | None = Field(None, description="Box 16: State wages, tips, etc.")
+ state_income_tax: float | None = Field(None, description="Box 17: State income tax withheld")
+ # Note: SSN/EIN extracted from heuristics, not LLM (privacy)
+ employer_name: str | None = Field(None, description="Employer's name")
+ retirement_plan: bool = Field(False, description="Box 13: Retirement plan checkbox marked")
+
+
+class Form1099ExtractedData(BaseModel):
+ """Structured 1099 data extracted by LLM."""
+ interest_income: float | None = Field(None, description="Interest income (1099-INT Box 1)")
+ dividend_income: float | None = Field(None, description="Total ordinary dividends (1099-DIV Box 1a)")
+ qualified_dividends: float | None = Field(None, description="Qualified dividends (1099-DIV Box 1b)")
+ capital_gain_distributions: float | None = Field(None, description="Capital gain distributions (1099-DIV Box 2a)")
+ proceeds: float | None = Field(None, description="Proceeds from sales (1099-B Box 1d)")
+ cost_basis: float | None = Field(None, description="Cost or other basis (1099-B Box 1e)")
+ rents: float | None = Field(None, description="Rents (1099-MISC Box 1)")
+ other_income: float | None = Field(None, description="Other income (1099-MISC Box 3)")
+ federal_tax_withheld: float | None = Field(None, description="Federal income tax withheld")
+ payer_name: str | None = Field(None, description="Payer's name")
+ # Note: Payer TIN extracted from heuristics, not LLM (privacy)
+
+
+class Form1095CExtractedData(BaseModel):
+ """Structured 1095-C data extracted by LLM (Employer-Provided Health Insurance Offer and Coverage)."""
+ # Part I - Employee Information
+ employer_name: str | None = Field(None, description="Part I Line 1: Employer name")
+ employer_contact_phone: str | None = Field(None, description="Part I Line 9: Employer contact phone")
+
+ # Part II - Employee Offer and Coverage (Line 14-16 codes for each month)
+ offer_of_coverage_code: str | None = Field(None, description="Line 14: Offer of Coverage code (e.g., 1A, 1B, 1C, 1E, 1H)")
+ employee_share_lowest_cost: float | None = Field(None, description="Line 15: Employee share of lowest cost monthly premium")
+ safe_harbor_code: str | None = Field(None, description="Line 16: Safe Harbor code (e.g., 2C, 2D, 2F)")
+
+ # Coverage months (Part II Line 14 - which months had coverage)
+ covered_all_12_months: bool = Field(False, description="Whether coverage was offered for all 12 months")
+ covered_months: list[str] | None = Field(None, description="List of months with coverage if not all 12")
+
+ # Part III - Covered Individuals (if applicable)
+ covered_individuals_count: int | None = Field(None, description="Number of covered individuals listed in Part III")
+
+
class TaxFormParser:
"""Hybrid tax form parser with tiered extraction."""
@@ -41,11 +97,71 @@ def __init__(self):
"""Initialize parser."""
self.extraction_history: list[dict[str, Any]] = []
+ async def parse_from_text(
+ self,
+ text: str,
+ form_type: Literal["W2", "1099-MISC", "1099-INT", "1099-DIV", "1099-B", "1095-C"],
+ tax_year: int,
+ llm_model: str | None = None,
+ ) -> dict[str, Any]:
+ """Parse tax form from pre-extracted text using LLM.
+
+ This method is used when text has already been extracted from the PDF
+ (e.g., by Unstructured or Docling) and we just need to parse the fields.
+
+ Always uses LLM extraction for accuracy.
+
+ Args:
+ text: Pre-extracted text content
+ form_type: Type of tax form
+ tax_year: Tax year for the form
+ llm_model: Optional LLM model string (e.g., "gemini/gemini-2.0-flash")
+ If not provided, uses default model
+
+ Returns:
+ Dictionary containing parsed data and confidence scores.
+ """
+ logger.info(f"Parsing {form_type} from pre-extracted text ({len(text)} chars) using LLM")
+
+ # Use LLM extraction for accuracy
+ extracted_data = await self._extract_with_llm(text, form_type, tax_year, llm_model=llm_model)
+ extraction_method = "llm_assisted"
+
+ # If LLM extraction failed, fall back to heuristic parsing
+ if not extracted_data:
+ logger.warning(f"LLM extraction failed for {form_type}, falling back to heuristic parsing")
+ if form_type == "W2":
+ extracted_data = self._parse_w2_text(text)
+ elif form_type == "1099-MISC":
+ extracted_data = self._parse_1099_misc_text(text)
+ elif form_type == "1099-INT":
+ extracted_data = self._parse_1099_int_text(text)
+ elif form_type == "1099-DIV":
+ extracted_data = self._parse_1099_div_text(text)
+ elif form_type == "1099-B":
+ extracted_data = self._parse_1099_b_text(text)
+ elif form_type == "1095-C":
+ extracted_data = self._parse_1095_c_text(text)
+ else:
+ raise ValueError(f"Unsupported form type: {form_type}")
+ extraction_method = "heuristic"
+
+ confidence_scores = self._calculate_confidence_scores(extracted_data)
+
+ return {
+ "extracted_data": extracted_data,
+ "confidence_scores": confidence_scores,
+ "extraction_method": extraction_method,
+ "needs_review": not self._meets_confidence_threshold(confidence_scores),
+ "raw_extraction_data": {"text_length": len(text)},
+ }
+
async def parse_tax_form(
self,
file_path: str | Path,
- form_type: Literal["W2", "1099-MISC", "1099-INT", "1099-DIV", "1099-B"],
+ form_type: Literal["W2", "1099-MISC", "1099-INT", "1099-DIV", "1099-B", "1095-C"],
tax_year: int,
+ llm_model: str | None = None,
) -> dict[str, Any]:
"""Parse tax form using tiered extraction strategy.
@@ -53,6 +169,7 @@ async def parse_tax_form(
file_path: Path to PDF file
form_type: Type of tax form
tax_year: Tax year for the form
+ llm_model: Optional LLM model string for fallback extraction
Returns:
Dictionary containing:
@@ -249,13 +366,6 @@ async def _extract_llm_assisted(
Best for: Verification of low-confidence fields, unusual layouts.
IMPORTANT: All PII is masked before sending to LLM.
"""
- # TODO: Implement LLM-assisted extraction using instructor
- # This would:
- # 1. Convert PDF to image or extract text
- # 2. Mask any PII found in previous extraction
- # 3. Send to LLM with structured output schema
- # 4. Return verified/extracted data
-
logger.warning("LLM-assisted extraction not yet implemented")
# For now, return previous result marked as needs_review
@@ -272,39 +382,217 @@ async def _extract_llm_assisted(
"raw_extraction_data": {"status": "not_implemented"},
}
+ async def _extract_with_llm(
+ self,
+ text: str,
+ form_type: str,
+ tax_year: int,
+ llm_model: str | None = None,
+ ) -> dict[str, Any]:
+ """Use LLM to extract structured data from tax form text.
+
+ This is called when heuristic parsing fails to find dollar amounts.
+ PII (SSN, EIN) is masked before sending to the LLM.
+
+ Args:
+ text: The document text to parse
+ form_type: Type of tax form
+ tax_year: Tax year
+ llm_model: Optional LLM model string. If not provided, uses default.
+ """
+ # Use provided model or fall back to default
+ model = llm_model or "gemini/gemini-2.0-flash"
+ logger.info(f"Using LLM extraction for {form_type} (tax year {tax_year}) with model: {model}")
+
+ # Mask PII before sending to LLM
+ masked_text, pii_mapping = mask_pii_in_text(text)
+ if pii_mapping:
+ logger.info(f"Masked {len(pii_mapping)} PII items before LLM call")
+
+ # Select the appropriate prompt based on form type
+ if form_type == "W2":
+ system_prompt = """You are a tax document parser. Extract W-2 form data from the provided text.
+
+Extract these specific fields:
+- wages_tips_compensation: Box 1 - Wages, tips, other compensation (the main income amount)
+- federal_income_tax_withheld: Box 2 - Federal income tax withheld
+- social_security_wages: Box 3 - Social security wages (often same as Box 1, capped at ~$176,100 for 2025)
+- social_security_tax_withheld: Box 4 - Social security tax withheld (about 6.2% of Box 3)
+- medicare_wages: Box 5 - Medicare wages and tips (often same as Box 1)
+- medicare_tax_withheld: Box 6 - Medicare tax withheld (about 1.45% of Box 5)
+- state_code: Box 15 - Two-letter state code (e.g., CA, NY, TX)
+- state_wages: Box 16 - State wages, tips, etc.
+- state_income_tax: Box 17 - State income tax withheld
+- employer_name: Name of the employer
+- retirement_plan: True if Box 13 retirement plan is checked
+
+Note: SSN and EIN have been masked for privacy (shown as [SSN:***-**-XXXX] or [EIN:XXXXXXXX]).
+Do NOT try to extract SSN or EIN values.
+
+Important:
+- Look for dollar amounts formatted as $XX,XXX.XX or XX,XXX.XX
+- The wages amount is typically the largest number (often $50,000 - $500,000 range)
+- Federal tax withheld is typically 10-35% of wages
+- Return null for any field you cannot find
+- Do NOT guess values - only extract what you can clearly identify"""
+
+ response_format = W2ExtractedData
+ elif form_type == "1095-C":
+ system_prompt = """You are a tax document parser. Extract Form 1095-C data from the provided text.
+
+Form 1095-C is the Employer-Provided Health Insurance Offer and Coverage form.
+
+Extract these specific fields:
+- employer_name: Part I Line 1 - Name of employer
+- employer_contact_phone: Part I Line 9 - Contact telephone number
+- offer_of_coverage_code: Line 14 - The offer of coverage code (common codes: 1A, 1B, 1C, 1E, 1H)
+- employee_share_lowest_cost: Line 15 - Employee's share of lowest cost monthly premium for self-only coverage
+- safe_harbor_code: Line 16 - Safe Harbor code (common codes: 2C, 2D, 2F)
+- covered_all_12_months: True if "All 12 Months" is checked or coverage shown for all months
+- covered_months: List of specific months if not all 12 (e.g., ["Jan", "Feb", "Mar"])
+- covered_individuals_count: Number of people listed in Part III Covered Individuals section
+
+Note: SSN and EIN have been masked for privacy. Do NOT try to extract SSN or EIN values.
+
+Important:
+- Look for the coverage codes in Line 14, 15, 16 columns
+- Return null for any field you cannot find
+- Do NOT guess values - only extract what you can clearly identify"""
+
+ response_format = Form1095CExtractedData
+ else:
+ system_prompt = """You are a tax document parser. Extract 1099 form data from the provided text.
+
+Extract any applicable fields:
+- interest_income: 1099-INT Box 1
+- dividend_income: 1099-DIV Box 1a
+- qualified_dividends: 1099-DIV Box 1b
+- capital_gain_distributions: 1099-DIV Box 2a
+- proceeds: 1099-B Box 1d (sale proceeds)
+- cost_basis: 1099-B Box 1e
+- rents: 1099-MISC Box 1
+- other_income: 1099-MISC Box 3
+- federal_tax_withheld: Federal tax withheld amount
+- payer_name: Name of the payer/institution
+
+Note: TINs have been masked for privacy. Do NOT try to extract TIN/SSN/EIN values.
+
+Return null for fields not present in this specific form."""
+
+ response_format = Form1099ExtractedData
+
+ # Use masked text in the prompt (first 8000 chars)
+ text_for_llm = masked_text[:8000]
+
+ user_prompt = f"""Extract tax form data from this {form_type} for tax year {tax_year}:
+
+{text_for_llm}
+
+Return the extracted values as JSON."""
+
+ try:
+ # Use litellm for LLM call with structured output
+ response = await litellm.acompletion(
+ model=model, # Use user's configured model or default
+ messages=[
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_prompt},
+ ],
+ response_format={"type": "json_object"},
+ temperature=0, # Deterministic
+ )
+
+ # Parse the response
+ content = response.choices[0].message.content
+ extracted_json = json.loads(content)
+
+ logger.info(f"LLM extracted data: {extracted_json}")
+
+ # Convert to our internal format
+ extracted_data = {}
+ for key, value in extracted_json.items():
+ if value is not None:
+ if isinstance(value, (int, float)) and key not in ["retirement_plan", "covered_all_12_months"]:
+ extracted_data[key] = Decimal(str(value))
+ else:
+ extracted_data[key] = value
+
+ # Map LLM field names to database field names
+ # 1099-DIV fields: LLM uses dividend_income, DB uses total_ordinary_dividends
+ field_mappings = {
+ "dividend_income": "total_ordinary_dividends",
+ "capital_gain_distributions": "total_capital_gain_distributions",
+ }
+
+ for llm_name, db_name in field_mappings.items():
+ if llm_name in extracted_data and db_name not in extracted_data:
+ extracted_data[db_name] = extracted_data.pop(llm_name)
+
+ return extracted_data
+
+ except Exception as e:
+ logger.error(f"LLM extraction failed: {e}")
+ return {}
+
def _parse_w2_text(self, text: str) -> dict[str, Any]:
- """Parse W2 form from extracted text."""
+ """Parse W2 form from extracted text.
+
+ Uses a two-phase approach:
+ 1. First, try to find all dollar amounts in the text
+ 2. Use context around those amounts to map them to W2 boxes
+ """
data: dict[str, Any] = {}
- # Box 1: Wages, tips, other compensation
- wages_match = re.search(r"(?:Wages|Box 1).*?(\d{1,3}(?:,\d{3})*(?:\.\d{2})?)", text, re.IGNORECASE)
- if wages_match:
- data["wages_tips_compensation"] = self._parse_money(wages_match.group(1))
+ # Find all dollar amounts in the text (realistic income/tax values)
+ # Must have comma-separated thousands OR decimal point for cents, OR be over 100
+ # This filters out box numbers like "21", "34", etc.
+ dollar_amounts = self._extract_dollar_amounts(text)
- # Box 2: Federal income tax withheld
- fed_tax_match = re.search(r"(?:Federal.*?tax.*?withheld|Box 2).*?(\d{1,3}(?:,\d{3})*(?:\.\d{2})?)", text, re.IGNORECASE)
- if fed_tax_match:
- data["federal_income_tax_withheld"] = self._parse_money(fed_tax_match.group(1))
+ logger.debug(f"Found {len(dollar_amounts)} dollar amounts: {dollar_amounts[:10]}")
- # Box 3: Social security wages
- ss_wages_match = re.search(r"(?:Social security wages|Box 3).*?(\d{1,3}(?:,\d{3})*(?:\.\d{2})?)", text, re.IGNORECASE)
- if ss_wages_match:
- data["social_security_wages"] = self._parse_money(ss_wages_match.group(1))
+ # For each W2 field, find the most likely value by looking at context
+ # Box 1: Wages, tips, other compensation (usually the largest amount)
+ if dollar_amounts:
+ # Sort by value descending - wages is typically the largest
+ sorted_amounts = sorted(dollar_amounts, key=lambda x: x[0], reverse=True)
+
+ # Wages is usually the largest amount
+ data["wages_tips_compensation"] = sorted_amounts[0][0]
+
+ # Federal tax withheld is usually second or third largest
+ if len(sorted_amounts) > 1:
+ # Look for one that's roughly 10-30% of wages
+ wages = float(sorted_amounts[0][0])
+ for amount, context in sorted_amounts[1:]:
+ amount_float = float(amount)
+ if wages > 0 and 0.05 <= amount_float / wages <= 0.40:
+ data["federal_income_tax_withheld"] = amount
+ break
+ else:
+ # Fallback to second largest
+ if len(sorted_amounts) > 1:
+ data["federal_income_tax_withheld"] = sorted_amounts[1][0]
+
+ # Try to find SS wages (often equal to wages, capped at SS limit ~$168,600 for 2024)
+ ss_limit = Decimal("168600")
+ if "wages_tips_compensation" in data:
+ wages = data["wages_tips_compensation"]
+ if wages <= ss_limit:
+ data["social_security_wages"] = wages
+ else:
+ data["social_security_wages"] = ss_limit
- # Box 4: Social security tax withheld
- ss_tax_match = re.search(r"(?:Social security tax|Box 4).*?(\d{1,3}(?:,\d{3})*(?:\.\d{2})?)", text, re.IGNORECASE)
- if ss_tax_match:
- data["social_security_tax_withheld"] = self._parse_money(ss_tax_match.group(1))
+ # SS tax is 6.2% of SS wages
+ if "social_security_wages" in data:
+ data["social_security_tax_withheld"] = (data["social_security_wages"] * Decimal("0.062")).quantize(Decimal("0.01"))
- # Box 5: Medicare wages
- medicare_wages_match = re.search(r"(?:Medicare wages|Box 5).*?(\d{1,3}(?:,\d{3})*(?:\.\d{2})?)", text, re.IGNORECASE)
- if medicare_wages_match:
- data["medicare_wages"] = self._parse_money(medicare_wages_match.group(1))
+ # Medicare wages usually equals regular wages
+ if "wages_tips_compensation" in data:
+ data["medicare_wages"] = data["wages_tips_compensation"]
- # Box 6: Medicare tax withheld
- medicare_tax_match = re.search(r"(?:Medicare tax|Box 6).*?(\d{1,3}(?:,\d{3})*(?:\.\d{2})?)", text, re.IGNORECASE)
- if medicare_tax_match:
- data["medicare_tax_withheld"] = self._parse_money(medicare_tax_match.group(1))
+ # Medicare tax is 1.45% of Medicare wages
+ if "medicare_wages" in data:
+ data["medicare_tax_withheld"] = (data["medicare_wages"] * Decimal("0.0145")).quantize(Decimal("0.01"))
# Extract SSN (will be hashed before storage)
ssn_match = re.search(self.PATTERNS["ssn"], text)
@@ -321,6 +609,56 @@ def _parse_w2_text(self, text: str) -> dict[str, Any]:
return data
+ def _extract_dollar_amounts(self, text: str) -> list[tuple[Decimal, str]]:
+ """Extract all realistic dollar amounts from text with surrounding context.
+
+ Returns list of (amount, context) tuples sorted by likelihood of being real values.
+ Filters out small numbers that are likely box numbers, dates, etc.
+ """
+ amounts = []
+
+ # Pattern for dollar amounts:
+ # - Optional $
+ # - Either: comma-separated thousands (1,000+)
+ # - Or: decimal values (100.00+)
+ # - Or: large numbers without commas (1000+)
+ patterns = [
+ # $1,234.56 or $1,234
+ r'\$\s*(\d{1,3}(?:,\d{3})+(?:\.\d{2})?)',
+ # 1,234.56 or 1,234 (with commas, no $)
+ r'(?= $100 (filters out box numbers, dates, etc.)
+ if amount >= 100:
+ # Get context (50 chars before and after)
+ start = max(0, match.start() - 50)
+ end = min(len(text), match.end() + 50)
+ context = text[start:end]
+ amounts.append((amount, context))
+ except (ValueError, decimal.InvalidOperation):
+ continue
+
+ # Remove duplicates by value
+ seen = set()
+ unique = []
+ for amount, context in amounts:
+ if amount not in seen:
+ seen.add(amount)
+ unique.append((amount, context))
+
+ return unique
+
def _parse_1099_misc_text(self, text: str) -> dict[str, Any]:
"""Parse 1099-MISC form from extracted text."""
data: dict[str, Any] = {}
@@ -409,6 +747,35 @@ def _parse_1099_b_text(self, text: str) -> dict[str, Any]:
return data
+ def _parse_1095_c_text(self, text: str) -> dict[str, Any]:
+ """Parse 1095-C form from extracted text (Employer-Provided Health Insurance)."""
+ data: dict[str, Any] = {}
+
+ # Employer name
+ employer_match = re.search(r"(?:Employer|Company).*?name[:\s]+([A-Za-z0-9\s,\.]+?)(?:\n|$)", text, re.IGNORECASE)
+ if employer_match:
+ data["employer_name"] = employer_match.group(1).strip()
+
+ # Line 14: Offer of Coverage code (1A, 1B, 1C, 1E, 1H, etc.)
+ offer_code_match = re.search(r"(?:Line\s*14|offer.*?coverage).*?\b(1[A-HJ-Z])\b", text, re.IGNORECASE)
+ if offer_code_match:
+ data["offer_of_coverage_code"] = offer_code_match.group(1).upper()
+
+ # Line 15: Employee share of lowest cost monthly premium
+ premium_match = re.search(r"(?:Line\s*15|lowest.*?cost|employee.*?share).*?(\d{1,3}(?:,\d{3})*(?:\.\d{2})?)", text, re.IGNORECASE)
+ if premium_match:
+ data["employee_share_lowest_cost"] = self._parse_money(premium_match.group(1))
+
+ # Line 16: Safe Harbor code (2C, 2D, 2F, etc.)
+ safe_harbor_match = re.search(r"(?:Line\s*16|safe.*?harbor).*?\b(2[A-HJ-Z])\b", text, re.IGNORECASE)
+ if safe_harbor_match:
+ data["safe_harbor_code"] = safe_harbor_match.group(1).upper()
+
+ # Check for "All 12 Months" coverage
+ data["covered_all_12_months"] = bool(re.search(r"all\s*12\s*months", text, re.IGNORECASE))
+
+ return data
+
def _parse_money(self, money_str: str) -> Decimal:
"""Parse money string to Decimal."""
# Remove $, spaces, commas
diff --git a/financegpt_backend/app/routes/documents_routes.py b/financegpt_backend/app/routes/documents_routes.py
index 9bae45b..121b4bc 100644
--- a/financegpt_backend/app/routes/documents_routes.py
+++ b/financegpt_backend/app/routes/documents_routes.py
@@ -828,3 +828,143 @@ async def delete_document(
raise HTTPException(
status_code=500, detail=f"Failed to delete document: {e!s}"
) from e
+
+
+@router.get("/tax-forms")
+async def get_tax_forms(
+ search_space_id: int,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user),
+):
+ """
+ Get all tax forms for the current user in a search space.
+ """
+ from app.db import TaxForm
+
+ try:
+ await check_permission(
+ session,
+ user,
+ search_space_id,
+ Permission.DOCUMENTS_READ.value,
+ "You don't have permission to read documents in this search space",
+ )
+
+ result = await session.execute(
+ select(TaxForm).where(
+ TaxForm.user_id == user.id,
+ TaxForm.search_space_id == search_space_id
+ ).order_by(TaxForm.tax_year.desc(), TaxForm.uploaded_at.desc())
+ )
+ tax_forms = result.scalars().all()
+
+ return {
+ "tax_forms": [
+ {
+ "id": str(form.id),
+ "form_type": form.form_type,
+ "tax_year": form.tax_year,
+ "document_id": form.document_id,
+ "processing_status": form.processing_status,
+ "extraction_method": form.extraction_method,
+ "needs_review": form.needs_review,
+ "uploaded_at": form.uploaded_at.isoformat() if form.uploaded_at else None,
+ "processed_at": form.processed_at.isoformat() if form.processed_at else None,
+ }
+ for form in tax_forms
+ ]
+ }
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(
+ status_code=500, detail=f"Failed to get tax forms: {e!s}"
+ ) from e
+
+
+@router.post("/tax-forms/{tax_form_id}/reprocess")
+async def reprocess_tax_form(
+ tax_form_id: str,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user),
+):
+ """
+ Re-process a stuck or failed tax form.
+
+ This will re-trigger the parsing task for a tax form that is stuck in 'processing'
+ or failed status.
+ """
+ from uuid import UUID
+ from app.db import TaxForm, Document
+
+ try:
+ # Get the tax form
+ result = await session.execute(
+ select(TaxForm).where(TaxForm.id == UUID(tax_form_id))
+ )
+ tax_form = result.scalar_one_or_none()
+
+ if not tax_form:
+ raise HTTPException(status_code=404, detail="Tax form not found")
+
+ if tax_form.user_id != user.id:
+ raise HTTPException(status_code=403, detail="Not authorized to reprocess this tax form")
+
+ # Check permission
+ await check_permission(
+ session,
+ user,
+ tax_form.search_space_id,
+ Permission.DOCUMENTS_CREATE.value,
+ "You don't have permission to reprocess documents in this search space",
+ )
+
+ # Get the associated document for the content
+ extracted_text = None
+ if tax_form.document_id:
+ doc_result = await session.execute(
+ select(Document).where(Document.id == tax_form.document_id)
+ )
+ document = doc_result.scalar_one_or_none()
+ if document and document.content:
+ # Use the document's summary content for parsing
+ extracted_text = document.content
+
+ if not extracted_text:
+ raise HTTPException(
+ status_code=400,
+ detail="Cannot reprocess: no document content available for this tax form"
+ )
+
+ # Reset status
+ tax_form.processing_status = 'processing'
+ tax_form.extraction_method = None
+ tax_form.needs_review = False
+ await session.commit()
+
+ # Trigger the parsing task
+ from app.tasks.celery_tasks.tax_form_tasks import parse_tax_form_task
+
+ parse_tax_form_task.delay(
+ tax_form_id=str(tax_form.id),
+ file_path=None, # No file path available
+ form_type=tax_form.form_type,
+ tax_year=tax_form.tax_year,
+ user_id=str(tax_form.user_id),
+ search_space_id=tax_form.search_space_id,
+ extracted_text=extracted_text,
+ )
+
+ return {
+ "message": "Tax form reprocessing started",
+ "tax_form_id": str(tax_form.id),
+ "form_type": tax_form.form_type,
+ "tax_year": tax_form.tax_year,
+ }
+ except HTTPException:
+ raise
+ except Exception as e:
+ await session.rollback()
+ raise HTTPException(
+ status_code=500, detail=f"Failed to reprocess tax form: {e!s}"
+ ) from e
diff --git a/financegpt_backend/app/services/plaid_service.py b/financegpt_backend/app/services/plaid_service.py
index a7c2827..2d09d5a 100644
--- a/financegpt_backend/app/services/plaid_service.py
+++ b/financegpt_backend/app/services/plaid_service.py
@@ -13,6 +13,12 @@
from plaid.exceptions import ApiException
from plaid.model.accounts_get_request import AccountsGetRequest
from plaid.model.country_code import CountryCode
+from plaid.model.investments_holdings_get_request import (
+ InvestmentsHoldingsGetRequest,
+)
+from plaid.model.investments_transactions_get_request import (
+ InvestmentsTransactionsGetRequest,
+)
from plaid.model.item_public_token_exchange_request import (
ItemPublicTokenExchangeRequest,
)
@@ -23,12 +29,6 @@
from plaid.model.transactions_get_request_options import (
TransactionsGetRequestOptions,
)
-from plaid.model.investments_holdings_get_request import (
- InvestmentsHoldingsGetRequest,
-)
-from plaid.model.investments_transactions_get_request import (
- InvestmentsTransactionsGetRequest,
-)
from app.config import config
diff --git a/financegpt_backend/app/tasks/celery_tasks/tax_form_tasks.py b/financegpt_backend/app/tasks/celery_tasks/tax_form_tasks.py
new file mode 100644
index 0000000..b25e0f5
--- /dev/null
+++ b/financegpt_backend/app/tasks/celery_tasks/tax_form_tasks.py
@@ -0,0 +1,435 @@
+"""Celery tasks for tax form parsing."""
+
+import logging
+from decimal import Decimal
+from typing import Optional
+from uuid import UUID
+
+from sqlalchemy import select
+from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
+from sqlalchemy.pool import NullPool
+
+from app.celery_app import celery_app
+from app.config import config
+
+logger = logging.getLogger(__name__)
+
+
+def get_celery_session_maker():
+ """
+ Create a new async session maker for Celery tasks.
+ Uses NullPool to avoid connection pooling issues in Celery workers.
+ """
+ engine = create_async_engine(
+ config.DATABASE_URL,
+ poolclass=NullPool,
+ echo=False,
+ )
+ return async_sessionmaker(engine, expire_on_commit=False)
+
+
+@celery_app.task(name="parse_tax_form", bind=True)
+def parse_tax_form_task(
+ self,
+ tax_form_id: str,
+ file_path: str | None,
+ form_type: str,
+ tax_year: int,
+ user_id: str,
+ search_space_id: int,
+ extracted_text: str | None = None,
+):
+ """
+ Celery task to parse an uploaded tax form and extract structured data.
+
+ Args:
+ tax_form_id: UUID of the tax_form record
+ file_path: Path to the uploaded PDF file (optional if extracted_text provided)
+ form_type: Type of tax form (W2, 1099-MISC, etc.)
+ tax_year: Tax year for the form
+ user_id: User ID who uploaded the form
+ search_space_id: Search space ID for LLM config lookup
+ extracted_text: Pre-extracted text content (used if file_path not available)
+ """
+ import asyncio
+
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+
+ try:
+ loop.run_until_complete(
+ _parse_tax_form(
+ tax_form_id=tax_form_id,
+ file_path=file_path,
+ form_type=form_type,
+ tax_year=tax_year,
+ user_id=user_id,
+ search_space_id=search_space_id,
+ extracted_text=extracted_text,
+ )
+ )
+ finally:
+ loop.close()
+
+
+async def _parse_tax_form(
+ tax_form_id: str,
+ file_path: str | None,
+ form_type: str,
+ tax_year: int,
+ user_id: str,
+ search_space_id: int,
+ extracted_text: str | None = None,
+):
+ """Parse tax form and save extracted data to database."""
+ from datetime import datetime, timezone
+ from decimal import Decimal
+ import hashlib
+
+ from app.db import (
+ TaxForm,
+ W2Form,
+ Form1099Misc,
+ Form1099Int,
+ Form1099Div,
+ Form1099B,
+ )
+ from app.parsers.tax_form_parser import TaxFormParser
+ from app.services.llm_service import get_document_summary_llm
+
+ async with get_celery_session_maker()() as session:
+ try:
+ # Get the user's configured LLM model string for document processing
+ llm_model = None
+ try:
+ user_llm = await get_document_summary_llm(session, search_space_id)
+ if user_llm:
+ llm_model = user_llm.model
+ logger.info(f"Using user's configured LLM for tax parsing: {llm_model}")
+ except Exception as e:
+ logger.warning(f"Could not get user LLM config, using default: {e}")
+
+ # Get the tax form record
+ result = await session.execute(
+ select(TaxForm).where(TaxForm.id == UUID(tax_form_id))
+ )
+ tax_form = result.scalar_one_or_none()
+
+ if not tax_form:
+ logger.error(f"Tax form record not found: {tax_form_id}")
+ return
+
+ logger.info(f"Starting tax form parsing for {form_type} (ID: {tax_form_id})")
+
+ # Initialize parser and parse the form
+ parser = TaxFormParser()
+
+ # If we have pre-extracted text, use it directly
+ if extracted_text and not file_path:
+ logger.info(f"Using pre-extracted text for {form_type} parsing")
+ parse_result = await parser.parse_from_text(
+ text=extracted_text,
+ form_type=form_type,
+ tax_year=tax_year,
+ llm_model=llm_model,
+ )
+ else:
+ parse_result = await parser.parse_tax_form(
+ file_path=file_path,
+ form_type=form_type,
+ tax_year=tax_year,
+ llm_model=llm_model,
+ )
+
+ extracted_data = parse_result.get("extracted_data", {})
+ confidence_scores = parse_result.get("confidence_scores", {})
+ extraction_method = parse_result.get("extraction_method", "unknown")
+ needs_review = parse_result.get("needs_review", True)
+ raw_data = parse_result.get("raw_extraction_data", {})
+
+ logger.info(
+ f"Parsed {form_type}: method={extraction_method}, "
+ f"fields_extracted={len(extracted_data)}, needs_review={needs_review}"
+ )
+
+ # Save extracted data to the appropriate form table
+ if form_type == "W2":
+ await _save_w2_data(
+ session, tax_form.id, extracted_data, confidence_scores, raw_data
+ )
+ elif form_type == "1099-MISC":
+ await _save_1099_misc_data(
+ session, tax_form.id, extracted_data, confidence_scores, raw_data
+ )
+ elif form_type == "1099-INT":
+ await _save_1099_int_data(
+ session, tax_form.id, extracted_data, confidence_scores, raw_data
+ )
+ elif form_type == "1099-DIV":
+ await _save_1099_div_data(
+ session, tax_form.id, extracted_data, confidence_scores, raw_data
+ )
+ elif form_type == "1099-B":
+ await _save_1099_b_data(
+ session, tax_form.id, extracted_data, confidence_scores, raw_data
+ )
+
+ # Update tax form status
+ tax_form.processing_status = "review_needed" if needs_review else "completed"
+ tax_form.extraction_method = extraction_method
+ tax_form.needs_review = needs_review
+ tax_form.processed_at = datetime.now(timezone.utc)
+
+ await session.commit()
+ logger.info(f"Tax form parsing completed for {form_type} (ID: {tax_form_id})")
+
+ except FileNotFoundError as e:
+ logger.error(f"Tax form file not found: {file_path}")
+ await _mark_tax_form_failed(session, tax_form_id, str(e))
+
+ except Exception as e:
+ logger.exception(f"Error parsing tax form {tax_form_id}: {e}")
+ await _mark_tax_form_failed(session, tax_form_id, str(e))
+
+
+async def _mark_tax_form_failed(session, tax_form_id: str, error_message: str):
+ """Mark tax form as failed."""
+ from app.db import TaxForm
+
+ result = await session.execute(
+ select(TaxForm).where(TaxForm.id == UUID(tax_form_id))
+ )
+ tax_form = result.scalar_one_or_none()
+
+ if tax_form:
+ tax_form.processing_status = "failed"
+ # Store error in extraction_method field as fallback (truncate to fit VARCHAR(50))
+ tax_form.extraction_method = f"error: {error_message[:40]}"
+ await session.commit()
+ logger.error(f"Marked tax form {tax_form_id} as failed: {error_message}")
+
+
+def _hash_pii(value: Optional[str]) -> Optional[str]:
+ """Hash PII values for secure storage."""
+ if not value:
+ return None
+ import hashlib
+ return hashlib.sha256(value.encode()).hexdigest()
+
+
+async def _save_w2_data(session, tax_form_id: UUID, data: dict, confidence: dict, raw_data: dict):
+ """Save extracted W2 data to database."""
+ from app.db import W2Form
+ from decimal import Decimal
+
+ w2 = W2Form(
+ tax_form_id=tax_form_id,
+ # Wage information
+ wages_tips_compensation=_to_decimal(data.get("wages_tips_compensation")),
+ federal_income_tax_withheld=_to_decimal(data.get("federal_income_tax_withheld")),
+ social_security_wages=_to_decimal(data.get("social_security_wages")),
+ social_security_tax_withheld=_to_decimal(data.get("social_security_tax_withheld")),
+ medicare_wages=_to_decimal(data.get("medicare_wages")),
+ medicare_tax_withheld=_to_decimal(data.get("medicare_tax_withheld")),
+ social_security_tips=_to_decimal(data.get("social_security_tips")),
+ allocated_tips=_to_decimal(data.get("allocated_tips")),
+ dependent_care_benefits=_to_decimal(data.get("dependent_care_benefits")),
+ nonqualified_plans=_to_decimal(data.get("nonqualified_plans")),
+ # State/local
+ state_wages=_to_decimal(data.get("state_wages")),
+ state_income_tax=_to_decimal(data.get("state_income_tax")),
+ local_wages=_to_decimal(data.get("local_wages")),
+ local_income_tax=_to_decimal(data.get("local_income_tax")),
+ state_code=data.get("state_code"),
+ locality_name=data.get("locality_name"),
+ # Checkboxes
+ statutory_employee=data.get("statutory_employee", False),
+ retirement_plan=data.get("retirement_plan", False),
+ third_party_sick_pay=data.get("third_party_sick_pay", False),
+ # Box 12 codes
+ box_12_codes=data.get("box_12_codes"),
+ # PII (hashed)
+ employee_ssn_hash=_hash_pii(data.get("employee_ssn")),
+ employer_ein_hash=_hash_pii(data.get("employer_ein")),
+ employer_name=data.get("employer_name"),
+ employer_address=data.get("employer_address"),
+ employee_name_masked="[EMPLOYEE_NAME]", # Always mask for UI
+ # Metadata
+ field_confidence_scores=confidence,
+ raw_extraction_data=raw_data,
+ )
+
+ session.add(w2)
+ await session.flush()
+ logger.info(f"Saved W2 form data for tax_form_id={tax_form_id}")
+
+
+async def _save_1099_misc_data(session, tax_form_id: UUID, data: dict, confidence: dict, raw_data: dict):
+ """Save extracted 1099-MISC data to database."""
+ from app.db import Form1099Misc
+
+ form = Form1099Misc(
+ tax_form_id=tax_form_id,
+ payer_name=data.get("payer_name"),
+ payer_tin_hash=_hash_pii(data.get("payer_tin")),
+ payer_address=data.get("payer_address"),
+ recipient_tin_hash=_hash_pii(data.get("recipient_tin")),
+ rents=_to_decimal(data.get("rents")),
+ royalties=_to_decimal(data.get("royalties")),
+ other_income=_to_decimal(data.get("other_income")),
+ federal_income_tax_withheld=_to_decimal(data.get("federal_income_tax_withheld")),
+ fishing_boat_proceeds=_to_decimal(data.get("fishing_boat_proceeds")),
+ medical_health_payments=_to_decimal(data.get("medical_health_payments")),
+ substitute_payments=_to_decimal(data.get("substitute_payments")),
+ crop_insurance_proceeds=_to_decimal(data.get("crop_insurance_proceeds")),
+ gross_proceeds_attorney=_to_decimal(data.get("gross_proceeds_attorney")),
+ section_409a_deferrals=_to_decimal(data.get("section_409a_deferrals")),
+ state_tax_withheld=_to_decimal(data.get("state_tax_withheld")),
+ state_payer_number=data.get("state_payer_number"),
+ state_income=_to_decimal(data.get("state_income")),
+ field_confidence_scores=confidence,
+ raw_extraction_data=raw_data,
+ )
+
+ session.add(form)
+ await session.flush()
+ logger.info(f"Saved 1099-MISC data for tax_form_id={tax_form_id}")
+
+
+async def _save_1099_int_data(session, tax_form_id: UUID, data: dict, confidence: dict, raw_data: dict):
+ """Save extracted 1099-INT data to database."""
+ from app.db import Form1099Int
+
+ form = Form1099Int(
+ tax_form_id=tax_form_id,
+ payer_name=data.get("payer_name"),
+ payer_tin_hash=_hash_pii(data.get("payer_tin")),
+ payer_address=data.get("payer_address"),
+ recipient_tin_hash=_hash_pii(data.get("recipient_tin")),
+ interest_income=_to_decimal(data.get("interest_income")),
+ early_withdrawal_penalty=_to_decimal(data.get("early_withdrawal_penalty")),
+ interest_on_us_savings_bonds=_to_decimal(data.get("interest_on_us_savings_bonds")),
+ federal_income_tax_withheld=_to_decimal(data.get("federal_income_tax_withheld")),
+ investment_expenses=_to_decimal(data.get("investment_expenses")),
+ foreign_tax_paid=_to_decimal(data.get("foreign_tax_paid")),
+ foreign_country=data.get("foreign_country"),
+ tax_exempt_interest=_to_decimal(data.get("tax_exempt_interest")),
+ specified_private_activity_bond_interest=_to_decimal(data.get("specified_private_activity_bond_interest")),
+ market_discount=_to_decimal(data.get("market_discount")),
+ bond_premium=_to_decimal(data.get("bond_premium")),
+ bond_premium_on_treasury=_to_decimal(data.get("bond_premium_on_treasury")),
+ bond_premium_on_tax_exempt=_to_decimal(data.get("bond_premium_on_tax_exempt")),
+ state_code=data.get("state_code"),
+ state_id=data.get("state_id"),
+ state_tax_withheld=_to_decimal(data.get("state_tax_withheld")),
+ field_confidence_scores=confidence,
+ raw_extraction_data=raw_data,
+ )
+
+ session.add(form)
+ await session.flush()
+ logger.info(f"Saved 1099-INT data for tax_form_id={tax_form_id}")
+
+
+async def _save_1099_div_data(session, tax_form_id: UUID, data: dict, confidence: dict, raw_data: dict):
+ """Save extracted 1099-DIV data to database."""
+ from app.db import Form1099Div
+
+ form = Form1099Div(
+ tax_form_id=tax_form_id,
+ payer_name=data.get("payer_name"),
+ payer_tin_hash=_hash_pii(data.get("payer_tin")),
+ payer_address=data.get("payer_address"),
+ recipient_tin_hash=_hash_pii(data.get("recipient_tin")),
+ total_ordinary_dividends=_to_decimal(data.get("total_ordinary_dividends")),
+ qualified_dividends=_to_decimal(data.get("qualified_dividends")),
+ total_capital_gain_distributions=_to_decimal(data.get("total_capital_gain_distributions")),
+ unrecaptured_section_1250_gain=_to_decimal(data.get("unrecaptured_section_1250_gain")),
+ section_1202_gain=_to_decimal(data.get("section_1202_gain")),
+ collectibles_28_gain=_to_decimal(data.get("collectibles_28_gain")),
+ section_897_ordinary_dividends=_to_decimal(data.get("section_897_ordinary_dividends")),
+ section_897_capital_gain=_to_decimal(data.get("section_897_capital_gain")),
+ nondividend_distributions=_to_decimal(data.get("nondividend_distributions")),
+ federal_income_tax_withheld=_to_decimal(data.get("federal_income_tax_withheld")),
+ section_199a_dividends=_to_decimal(data.get("section_199a_dividends")),
+ investment_expenses=_to_decimal(data.get("investment_expenses")),
+ foreign_tax_paid=_to_decimal(data.get("foreign_tax_paid")),
+ foreign_country=data.get("foreign_country"),
+ cash_liquidation_distributions=_to_decimal(data.get("cash_liquidation_distributions")),
+ noncash_liquidation_distributions=_to_decimal(data.get("noncash_liquidation_distributions")),
+ exempt_interest_dividends=_to_decimal(data.get("exempt_interest_dividends")),
+ specified_private_activity_bond_interest_dividends=_to_decimal(data.get("specified_private_activity_bond_interest_dividends")),
+ state_tax_withheld=_to_decimal(data.get("state_tax_withheld")),
+ field_confidence_scores=confidence,
+ raw_extraction_data=raw_data,
+ )
+
+ session.add(form)
+ await session.flush()
+ logger.info(f"Saved 1099-DIV data for tax_form_id={tax_form_id}")
+
+
+async def _save_1099_b_data(session, tax_form_id: UUID, data: dict, confidence: dict, raw_data: dict):
+ """Save extracted 1099-B data to database."""
+ from app.db import Form1099B
+
+ form = Form1099B(
+ tax_form_id=tax_form_id,
+ payer_name=data.get("broker_name") or data.get("payer_name"),
+ payer_tin_hash=_hash_pii(data.get("broker_tin") or data.get("payer_tin")),
+ payer_address=data.get("broker_address") or data.get("payer_address"),
+ recipient_tin_hash=_hash_pii(data.get("recipient_tin")),
+ description_of_property=data.get("description_of_property"),
+ date_acquired=data.get("date_acquired"),
+ date_sold=data.get("date_sold"),
+ proceeds=_to_decimal(data.get("proceeds")),
+ cost_basis=_to_decimal(data.get("cost_basis")),
+ accrued_market_discount=_to_decimal(data.get("accrued_market_discount")),
+ wash_sale_loss_disallowed=_to_decimal(data.get("wash_sale_loss_disallowed") or data.get("wash_sale_loss")),
+ federal_income_tax_withheld=_to_decimal(data.get("federal_income_tax_withheld")),
+ # Form 8949 checkboxes
+ short_term_box_a=data.get("short_term_box_a", False),
+ short_term_box_b=data.get("short_term_box_b", False),
+ short_term_box_c=data.get("short_term_box_c", False),
+ long_term_box_d=data.get("long_term_box_d", False),
+ long_term_box_e=data.get("long_term_box_e", False),
+ long_term_box_f=data.get("long_term_box_f", False),
+ loss_not_allowed=data.get("loss_not_allowed", False),
+ noncovered_security=data.get("noncovered_security", False),
+ basis_reported_to_irs=data.get("basis_reported_to_irs", False),
+ state_code=data.get("state_code"),
+ state_id=data.get("state_id"),
+ state_tax_withheld=_to_decimal(data.get("state_tax_withheld")),
+ field_confidence_scores=confidence,
+ raw_extraction_data=raw_data,
+ )
+
+ session.add(form)
+ await session.flush()
+ logger.info(f"Saved 1099-B data for tax_form_id={tax_form_id}")
+
+
+def _to_decimal(value) -> Decimal | None:
+ """Convert value to Decimal, handling various formats."""
+ from decimal import InvalidOperation
+
+ if value is None:
+ return None
+
+ if isinstance(value, Decimal):
+ return value
+
+ if isinstance(value, (int, float)):
+ return Decimal(str(value))
+
+ if isinstance(value, str):
+ # Remove currency symbols, commas, spaces
+ cleaned = value.replace("$", "").replace(",", "").replace(" ", "").strip()
+ if not cleaned:
+ return None
+ try:
+ return Decimal(cleaned)
+ except InvalidOperation:
+ return None
+
+ return None
diff --git a/financegpt_backend/app/tasks/document_processors/file_processors.py b/financegpt_backend/app/tasks/document_processors/file_processors.py
index bbc1d7b..4d5f941 100644
--- a/financegpt_backend/app/tasks/document_processors/file_processors.py
+++ b/financegpt_backend/app/tasks/document_processors/file_processors.py
@@ -58,6 +58,8 @@ async def _process_tax_form_if_applicable(
filename: str,
user_id: str,
search_space_id: int,
+ file_path: str | None = None,
+ extracted_text: str | None = None,
) -> None:
"""
Check if uploaded document is a tax form and trigger parsing if so.
@@ -71,18 +73,11 @@ async def _process_tax_form_if_applicable(
filename: Original filename
user_id: User ID
search_space_id: Search space ID
+ file_path: Path to the uploaded file (optional, for parsing)
+ extracted_text: Already extracted text from the document (used if file_path not available)
"""
from uuid import UUID
from app.db import TaxForm
- from app.parsers.tax_form_parser import TaxFormParser
- from app.utils.pii_masking import prepare_tax_form_for_storage
- from app.schemas.tax_forms import (
- W2FormCreate,
- Form1099MiscCreate,
- Form1099IntCreate,
- Form1099DivCreate,
- Form1099BCreate,
- )
import re
try:
@@ -100,11 +95,21 @@ async def _process_tax_form_if_applicable(
if year_match:
tax_year = int(year_match.group(1))
+ # If no year in filename, try to extract from document content
+ if not tax_year and extracted_text:
+ # Look for tax year patterns in content (e.g., "2025 W-2", "Tax Year 2025", "for the 2025 tax year")
+ content_year_match = re.search(r'(?:tax\s+year|for\s+(?:the\s+)?)\s*(20\d{2})|(?:20\d{2})\s+W-?2', extracted_text, re.IGNORECASE)
+ if content_year_match:
+ year_str = content_year_match.group(1) or re.search(r'(20\d{2})', content_year_match.group(0)).group(1)
+ tax_year = int(year_str)
+
# Detect form type
if 'w2' in filename_lower or 'w-2' in filename_lower:
form_type = 'W2'
if not tax_year:
- tax_year = 2024 # Default to current tax year
+ # Default to previous year (tax forms are usually for prior year)
+ from datetime import datetime
+ tax_year = datetime.now().year - 1
elif '1099' in filename_lower:
if 'misc' in filename_lower:
form_type = '1099-MISC'
@@ -119,7 +124,18 @@ async def _process_tax_form_if_applicable(
form_type = '1099-MISC' # Default
if not tax_year:
- tax_year = 2024
+ # Default to previous year (tax forms are usually for prior year)
+ from datetime import datetime
+ tax_year = datetime.now().year - 1
+ elif '1095' in filename_lower:
+ if 'c' in filename_lower or '-c' in filename_lower:
+ form_type = '1095-C'
+ else:
+ form_type = '1095-C' # Default to 1095-C (most common employer form)
+
+ if not tax_year:
+ from datetime import datetime
+ tax_year = datetime.now().year - 1
# If not a tax form, return early
if not form_type:
@@ -142,24 +158,89 @@ async def _process_tax_form_if_applicable(
logger.info(f"Created tax form record with ID {tax_form.id}, starting parsing...")
- # TODO: Trigger async parsing task
- # For now, log that parsing would happen
- # In production, this would be a Celery task
- logger.info(
- f"Tax form parsing would be triggered here for {form_type} (tax_form_id={tax_form.id}). "
- f"Parser integration pending."
- )
+ # Prefer file_path if available, otherwise use extracted_text
+ actual_file_path = file_path
+ actual_extracted_text = extracted_text
+
+ if not actual_file_path and not actual_extracted_text:
+ logger.warning(
+ f"No file path or extracted text provided for tax form {tax_form.id}, parsing will be skipped"
+ )
+ tax_form.processing_status = 'failed'
+ tax_form.extraction_method = 'error: no file path or text available'
+ await session.commit()
+ return
# Update status to show it's queued for processing
tax_form.processing_status = 'processing'
await session.commit()
+ # Trigger async parsing task
+ from app.tasks.celery_tasks.tax_form_tasks import parse_tax_form_task
+
+ parse_tax_form_task.delay(
+ tax_form_id=str(tax_form.id),
+ file_path=actual_file_path,
+ form_type=form_type,
+ tax_year=tax_year,
+ user_id=user_id,
+ search_space_id=search_space_id,
+ extracted_text=actual_extracted_text,
+ )
+
+ logger.info(f"Queued tax form parsing task for {form_type} (tax_form_id={tax_form.id})")
+
except Exception as e:
logger.error(f"Error processing tax form for {filename}: {e}")
# Don't fail the document upload if tax parsing fails
await session.rollback()
+def _clean_symbol(raw_symbol: str | None) -> str:
+ """
+ Clean and normalize a stock symbol to fit VARCHAR(20) constraint.
+
+ Handles special cases like:
+ - Schwab's "**CASH & CASH INVESTMENTS**" -> "CASH"
+ - Empty or None values -> ""
+ - Truncates overly long symbols
+
+ Args:
+ raw_symbol: Raw symbol string from CSV
+
+ Returns:
+ Cleaned symbol string (max 20 chars)
+ """
+ if not raw_symbol:
+ return ""
+
+ symbol = str(raw_symbol).strip()
+
+ # Normalize common cash/money market descriptions to standard symbol
+ cash_patterns = [
+ "cash & cash investments",
+ "cash investments",
+ "money market",
+ "sweep account",
+ "cash equivalent",
+ "cash balance",
+ ]
+
+ symbol_lower = symbol.lower().replace("*", "").strip()
+ for pattern in cash_patterns:
+ if pattern in symbol_lower:
+ return "CASH"
+
+ # Remove any asterisks and extra whitespace
+ symbol = symbol.replace("*", "").strip()
+
+ # Truncate to 20 chars (VARCHAR(20) limit)
+ if len(symbol) > 20:
+ symbol = symbol[:20]
+
+ return symbol
+
+
async def _save_investment_holdings(
session: AsyncSession,
user_id: str,
@@ -199,7 +280,11 @@ async def _save_investment_holdings(
# Process each holding
total_value = 0.0
for holding_data in holdings_data:
- symbol = holding_data.symbol if hasattr(holding_data, 'symbol') else holding_data.get('symbol', '')
+ raw_symbol = holding_data.symbol if hasattr(holding_data, 'symbol') else holding_data.get('symbol', '')
+
+ # Clean and normalize symbol (VARCHAR(20) limit in DB)
+ # Handle special cases like Schwab's "**CASH & CASH INVESTMENTS**"
+ symbol = _clean_symbol(raw_symbol)
# Handle None values from CSV parsing
quantity_raw = holding_data.quantity if hasattr(holding_data, 'quantity') else holding_data.get('quantity')
@@ -631,8 +716,10 @@ async def add_received_file_document_using_unstructured(
await session.refresh(document)
# After successful document creation, check if this is a tax form
+ # Pass the extracted markdown content for parsing
await _process_tax_form_if_applicable(
- session, document, file_name, user_id, search_space_id
+ session, document, file_name, user_id, search_space_id,
+ extracted_text=file_in_markdown
)
return document
@@ -775,8 +862,10 @@ async def add_received_file_document_using_llamacloud(
await session.refresh(document)
# After successful document creation, check if this is a tax form
+ # Pass the extracted markdown content for parsing
await _process_tax_form_if_applicable(
- session, document, file_name, user_id, search_space_id
+ session, document, file_name, user_id, search_space_id,
+ extracted_text=file_in_markdown
)
return document
@@ -943,6 +1032,13 @@ async def add_received_file_document_using_docling(
await session.commit()
await session.refresh(document)
+ # After successful document creation, check if this is a tax form
+ # Pass the extracted markdown content for parsing
+ await _process_tax_form_if_applicable(
+ session, document, file_name, user_id, search_space_id,
+ extracted_text=file_in_markdown
+ )
+
return document
except SQLAlchemyError as db_error:
await session.rollback()
diff --git a/financegpt_backend/app/utils/pii_masking.py b/financegpt_backend/app/utils/pii_masking.py
index 97f8286..1342ec3 100644
--- a/financegpt_backend/app/utils/pii_masking.py
+++ b/financegpt_backend/app/utils/pii_masking.py
@@ -285,3 +285,100 @@ def validate_confidence_threshold(
all_passed = len(failed_fields) == 0
return all_passed, failed_fields
+
+
+def mask_pii_in_text(text: str) -> tuple[str, dict[str, str]]:
+ """Mask PII in raw text before sending to LLM.
+
+ This function finds and masks:
+ - SSN patterns (XXX-XX-XXXX or XXXXXXXXX)
+ - EIN patterns (XX-XXXXXXX)
+ - Keeps dollar amounts intact (needed for extraction)
+
+ Args:
+ text: Raw text containing potential PII
+
+ Returns:
+ Tuple of (masked_text, mapping_dict)
+ - masked_text: Text with PII replaced by placeholders
+ - mapping_dict: Original values keyed by placeholder (for recovery if needed)
+
+ Examples:
+ >>> text = "SSN: 123-45-6789, EIN: 12-3456789, Wages: $50,000"
+ >>> masked, mapping = mask_pii_in_text(text)
+ >>> "123-45-6789" not in masked
+ True
+ >>> "$50,000" in masked # Dollar amounts preserved
+ True
+ """
+ masked_text = text
+ mapping = {}
+
+ # Pattern for SSN: XXX-XX-XXXX (with dashes)
+ ssn_pattern_dashed = r'\b(\d{3})-(\d{2})-(\d{4})\b'
+
+ def mask_ssn_match(match):
+ full_ssn = match.group(0)
+ last_four = match.group(3)
+ placeholder = f"[SSN:***-**-{last_four}]"
+ mapping[placeholder] = full_ssn
+ return placeholder
+
+ masked_text = re.sub(ssn_pattern_dashed, mask_ssn_match, masked_text)
+
+ # Pattern for SSN without dashes: 9 consecutive digits that look like SSN
+ # Be careful not to match other numbers like phone numbers or account numbers
+ # Only match if it appears after SSN-related keywords
+ ssn_pattern_nodash = r'(?i)(?:ssn|social\s*security|ss#|ss\s*#|soc\s*sec)[:\s]*(\d{9})\b'
+
+ def mask_ssn_nodash_match(match):
+ ssn_digits = match.group(1)
+ last_four = ssn_digits[-4:]
+ placeholder = f"[SSN:*****{last_four}]"
+ mapping[placeholder] = ssn_digits
+ # Return just the placeholder (the keyword prefix was already matched)
+ return match.group(0).replace(ssn_digits, placeholder)
+
+ masked_text = re.sub(ssn_pattern_nodash, mask_ssn_nodash_match, masked_text)
+
+ # Pattern for EIN: XX-XXXXXXX
+ ein_pattern = r'\b(\d{2})-(\d{7})\b'
+
+ def mask_ein_match(match):
+ full_ein = match.group(0)
+ # Hash the EIN for the placeholder (truncate hash for readability)
+ ein_hash = hashlib.sha256(full_ein.replace('-', '').encode()).hexdigest()[:8]
+ placeholder = f"[EIN:{ein_hash}]"
+ mapping[placeholder] = full_ein
+ return placeholder
+
+ masked_text = re.sub(ein_pattern, mask_ein_match, masked_text)
+
+ return masked_text, mapping
+
+
+def recover_pii_from_mapping(
+ extracted_data: dict[str, Any],
+ mapping: dict[str, str]
+) -> dict[str, Any]:
+ """Recover original PII values from masked placeholders.
+
+ Args:
+ extracted_data: Data extracted from LLM with masked values
+ mapping: Mapping from mask_pii_in_text()
+
+ Returns:
+ Data with original PII values restored
+ """
+ recovered = {}
+
+ for key, value in extracted_data.items():
+ if isinstance(value, str):
+ # Check if this is a masked value and recover it
+ for placeholder, original in mapping.items():
+ if placeholder in value:
+ value = original
+ break
+ recovered[key] = value
+
+ return recovered
diff --git a/financegpt_backend/financegpt_new_backend.egg-info/PKG-INFO b/financegpt_backend/financegpt_new_backend.egg-info/PKG-INFO
index 0627bb2..a9042d4 100644
--- a/financegpt_backend/financegpt_new_backend.egg-info/PKG-INFO
+++ b/financegpt_backend/financegpt_new_backend.egg-info/PKG-INFO
@@ -31,6 +31,7 @@ Requires-Dist: en-core-web-sm @ https://github.com/explosion/spacy-models/releas
Requires-Dist: static-ffmpeg>=2.13
Requires-Dist: tavily-python>=0.3.2
Requires-Dist: unstructured-client>=0.30.0
+Requires-Dist: unstructured[all-docs]==0.17.2
Requires-Dist: uvicorn[standard]>=0.34.0
Requires-Dist: validators>=0.34.0
Requires-Dist: youtube-transcript-api>=1.0.3
diff --git a/financegpt_backend/financegpt_new_backend.egg-info/SOURCES.txt b/financegpt_backend/financegpt_new_backend.egg-info/SOURCES.txt
index 58fe279..85aba3b 100644
--- a/financegpt_backend/financegpt_new_backend.egg-info/SOURCES.txt
+++ b/financegpt_backend/financegpt_new_backend.egg-info/SOURCES.txt
@@ -2,6 +2,7 @@ pyproject.toml
alembic/env.py
alembic/versions/0_initial_schema.py
alembic/versions/1_add_investment_holdings_tables.py
+alembic/versions/2_add_tax_forms_tables.py
app/__init__.py
app/app.py
app/celery_app.py
@@ -29,6 +30,7 @@ app/agents/new_chat/tools/portfolio_performance.py
app/agents/new_chat/tools/registry.py
app/agents/new_chat/tools/search_financegpt_docs.py
app/agents/new_chat/tools/search_transactions.py
+app/agents/new_chat/tools/tax_analysis.py
app/agents/new_chat/tools/user_memory.py
app/agents/podcaster/__init__.py
app/agents/podcaster/configuration.py
@@ -49,12 +51,11 @@ app/connectors/google_drive/file_types.py
app/connectors/google_drive/folder_manager.py
app/parsers/__init__.py
app/parsers/base_financial_parser.py
-app/parsers/chase_parser.py
-app/parsers/discover_parser.py
-app/parsers/fidelity_parser.py
+app/parsers/llm_csv_parser.py
app/parsers/ofx_parser.py
app/parsers/parser_factory.py
app/parsers/pdf_statement_parser.py
+app/parsers/tax_form_parser.py
app/prompts/__init__.py
app/retriever/__init__.py
app/retriever/chunks_hybrid_search.py
@@ -65,7 +66,6 @@ app/routes/composio_routes.py
app/routes/documents_routes.py
app/routes/editor_routes.py
app/routes/financegpt_docs_routes.py
-app/routes/investments.py
app/routes/logs_routes.py
app/routes/new_chat_routes.py
app/routes/new_llm_config_routes.py
@@ -97,6 +97,7 @@ app/schemas/rbac_schemas.py
app/schemas/search_source_connector.py
app/schemas/search_space.py
app/schemas/slack_auth_credentials.py
+app/schemas/tax_forms.py
app/schemas/teams_auth_credentials.py
app/schemas/users.py
app/services/__init__.py
@@ -109,6 +110,7 @@ app/services/investment_csv_parser.py
app/services/kokoro_tts_service.py
app/services/llm_document_extractor.py
app/services/llm_service.py
+app/services/llm_stock_enrichment.py
app/services/new_streaming_service.py
app/services/notification_service.py
app/services/page_limit_service.py
@@ -126,6 +128,7 @@ app/tasks/celery_tasks/connector_tasks.py
app/tasks/celery_tasks/document_reindex_tasks.py
app/tasks/celery_tasks/document_tasks.py
app/tasks/celery_tasks/schedule_checker_task.py
+app/tasks/celery_tasks/tax_form_tasks.py
app/tasks/chat/stream_new_chat.py
app/tasks/connector_indexers/__init__.py
app/tasks/connector_indexers/base.py
@@ -147,6 +150,7 @@ app/utils/credit_card_rewards_fetcher.py
app/utils/document_converters.py
app/utils/oauth_security.py
app/utils/periodic_scheduler.py
+app/utils/pii_masking.py
app/utils/rbac.py
app/utils/subscription_utils.py
app/utils/validators.py
diff --git a/financegpt_backend/financegpt_new_backend.egg-info/requires.txt b/financegpt_backend/financegpt_new_backend.egg-info/requires.txt
index c840bdc..83a761b 100644
--- a/financegpt_backend/financegpt_new_backend.egg-info/requires.txt
+++ b/financegpt_backend/financegpt_new_backend.egg-info/requires.txt
@@ -26,6 +26,7 @@ en-core-web-sm @ https://github.com/explosion/spacy-models/releases/download/en_
static-ffmpeg>=2.13
tavily-python>=0.3.2
unstructured-client>=0.30.0
+unstructured[all-docs]==0.17.2
uvicorn[standard]>=0.34.0
validators>=0.34.0
youtube-transcript-api>=1.0.3
diff --git a/financegpt_web/components/layout/providers/LayoutDataProvider.tsx b/financegpt_web/components/layout/providers/LayoutDataProvider.tsx
index 3fe8130..c76a9af 100644
--- a/financegpt_web/components/layout/providers/LayoutDataProvider.tsx
+++ b/financegpt_web/components/layout/providers/LayoutDataProvider.tsx
@@ -31,6 +31,7 @@ import { LayoutShell } from "../ui/shell";
import { AllPrivateChatsSidebar } from "../ui/sidebar/AllPrivateChatsSidebar";
import { AllSharedChatsSidebar } from "../ui/sidebar/AllSharedChatsSidebar";
import { InboxSidebar } from "../ui/sidebar/InboxSidebar";
+import { UploadStatusPanel } from "../ui/upload-status-panel";
interface LayoutDataProviderProps {
searchSpaceId: string;
@@ -553,6 +554,9 @@ export function LayoutDataProvider({
markAllAsRead={markAllAsRead}
/>
+ {/* Google Drive-style Upload Status Panel */}
+
+
{/* Create Search Space Dialog */}
Promise;
+}
+
+/**
+ * Get display name for connector type
+ */
+function getConnectorTypeDisplayName(connectorType: string): string {
+ const displayNames: Record = {
+ GITHUB_CONNECTOR: "GitHub",
+ GOOGLE_CALENDAR_CONNECTOR: "Google Calendar",
+ GOOGLE_GMAIL_CONNECTOR: "Gmail",
+ GOOGLE_DRIVE_CONNECTOR: "Google Drive",
+ LINEAR_CONNECTOR: "Linear",
+ NOTION_CONNECTOR: "Notion",
+ SLACK_CONNECTOR: "Slack",
+ TEAMS_CONNECTOR: "Microsoft Teams",
+ DISCORD_CONNECTOR: "Discord",
+ JIRA_CONNECTOR: "Jira",
+ CONFLUENCE_CONNECTOR: "Confluence",
+ BOOKSTACK_CONNECTOR: "BookStack",
+ CLICKUP_CONNECTOR: "ClickUp",
+ AIRTABLE_CONNECTOR: "Airtable",
+ LUMA_CONNECTOR: "Luma",
+ ELASTICSEARCH_CONNECTOR: "Elasticsearch",
+ WEBCRAWLER_CONNECTOR: "Web Crawler",
+ YOUTUBE_CONNECTOR: "YouTube",
+ CIRCLEBACK_CONNECTOR: "Circleback",
+ MCP_CONNECTOR: "MCP",
+ TAVILY_API: "Tavily",
+ SEARXNG_API: "SearXNG",
+ LINKUP_API: "Linkup",
+ BAIDU_SEARCH_API: "Baidu",
+ };
+
+ return (
+ displayNames[connectorType] ||
+ connectorType
+ .replace(/_/g, " ")
+ .replace(/CONNECTOR|API/gi, "")
+ .trim()
+ );
+}
+
+/**
+ * Google Drive-style floating upload status panel
+ * Shows in-progress uploads and recent completions
+ */
+export function UploadStatusPanel({ inboxItems, markAsRead }: UploadStatusPanelProps) {
+ const t = useTranslations("sidebar");
+ const [isExpanded, setIsExpanded] = useState(true);
+ const [mounted, setMounted] = useState(false);
+ const [dismissedIds, setDismissedIds] = useState>(new Set());
+
+ useEffect(() => {
+ setMounted(true);
+ }, []);
+
+ // Filter to only status items (uploads, connector indexing, document processing)
+ // Only show items from the last 30 minutes that are in progress or recently completed
+ const statusItems = useMemo(() => {
+ const thirtyMinutesAgo = new Date(Date.now() - 30 * 60 * 1000);
+
+ return inboxItems
+ .filter((item) => {
+ // Only status items
+ if (item.type !== "connector_indexing" && item.type !== "document_processing") {
+ return false;
+ }
+
+ // Don't show dismissed items
+ if (dismissedIds.has(item.id)) {
+ return false;
+ }
+
+ // Get status from metadata
+ const metadata = item.metadata as Record;
+ const status = typeof metadata?.status === "string" ? metadata.status : undefined;
+
+ // Always show in-progress items
+ if (status === "in_progress") {
+ return true;
+ }
+
+ // Show completed/failed items only if recent (within 30 mins)
+ const createdAt = new Date(item.created_at);
+ if (createdAt >= thirtyMinutesAgo) {
+ return true;
+ }
+
+ return false;
+ })
+ .sort((a, b) => new Date(b.created_at).getTime() - new Date(a.created_at).getTime())
+ .slice(0, 5); // Max 5 items to show
+ }, [inboxItems, dismissedIds]);
+
+ // Count in-progress items
+ const inProgressCount = useMemo(() => {
+ return statusItems.filter((item) => {
+ const metadata = item.metadata as Record;
+ return metadata?.status === "in_progress";
+ }).length;
+ }, [statusItems]);
+
+ // Auto-collapse when no items
+ useEffect(() => {
+ if (statusItems.length === 0) {
+ setIsExpanded(false);
+ } else if (inProgressCount > 0) {
+ // Auto-expand when new in-progress item appears
+ setIsExpanded(true);
+ }
+ }, [statusItems.length, inProgressCount]);
+
+ const handleDismiss = useCallback(
+ async (item: InboxItem, e: React.MouseEvent) => {
+ e.stopPropagation();
+ setDismissedIds((prev) => new Set([...prev, item.id]));
+ if (!item.read) {
+ await markAsRead(item.id);
+ }
+ },
+ [markAsRead]
+ );
+
+ const handleDismissAll = useCallback(async () => {
+ const ids = statusItems.map((item) => item.id);
+ setDismissedIds((prev) => new Set([...prev, ...ids]));
+
+ // Mark all as read
+ for (const item of statusItems) {
+ if (!item.read) {
+ await markAsRead(item.id);
+ }
+ }
+ }, [statusItems, markAsRead]);
+
+ const getStatusIcon = (item: InboxItem) => {
+ const metadata = item.metadata as Record;
+ const status = typeof metadata?.status === "string" ? metadata.status : undefined;
+
+ switch (status) {
+ case "in_progress":
+ return ;
+ case "completed":
+ return ;
+ case "failed":
+ return ;
+ default:
+ return ;
+ }
+ };
+
+ const getItemIcon = (item: InboxItem) => {
+ if (item.type === "connector_indexing" && isConnectorIndexingMetadata(item.metadata)) {
+ return getConnectorIcon(item.metadata.connector_type, "h-4 w-4");
+ }
+ return ;
+ };
+
+ const getProgressValue = (item: InboxItem): number | null => {
+ if (item.type === "connector_indexing" && isConnectorIndexingMetadata(item.metadata)) {
+ const meta = item.metadata as ConnectorIndexingMetadata;
+ if (meta.progress_percent !== undefined) {
+ return meta.progress_percent;
+ }
+ if (meta.total_count && meta.indexed_count) {
+ return Math.round((meta.indexed_count / meta.total_count) * 100);
+ }
+ }
+ return null;
+ };
+
+ const getItemSubtitle = (item: InboxItem): string => {
+ if (item.type === "connector_indexing" && isConnectorIndexingMetadata(item.metadata)) {
+ const meta = item.metadata as ConnectorIndexingMetadata;
+ return getConnectorTypeDisplayName(meta.connector_type);
+ }
+ if (item.type === "document_processing" && isDocumentProcessingMetadata(item.metadata)) {
+ const meta = item.metadata as DocumentProcessingMetadata;
+ return meta.processing_stage.replace(/_/g, " ");
+ }
+ return item.type.replace(/_/g, " ");
+ };
+
+ // Don't render if no items and not mounted
+ if (!mounted || statusItems.length === 0) {
+ return null;
+ }
+
+ return createPortal(
+
+
+ {/* Header - Always visible */}
+
+
+ {/* Content - Collapsible */}
+
+ {isExpanded && (
+
+