diff --git a/.env.example b/.env.example deleted file mode 100644 index f284483..0000000 --- a/.env.example +++ /dev/null @@ -1,42 +0,0 @@ -# Docker Specific Env's Only - Can skip if needed - -# Celery Config -REDIS_PORT=6379 -FLOWER_PORT=5555 - -# Frontend Configuration -FRONTEND_PORT=3000 -NEXT_PUBLIC_FASTAPI_BACKEND_URL=http://localhost:8000 (Default: http://localhost:8000) -NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE=LOCAL or GOOGLE (Default: LOCAL) -NEXT_PUBLIC_ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING (Default: DOCLING) -# Backend Configuration -BACKEND_PORT=8000 - -# Database Configuration -POSTGRES_USER=postgres -POSTGRES_PASSWORD=postgres -POSTGRES_DB=financegpt -POSTGRES_PORT=5432 - -# Electric-SQL Configuration -ELECTRIC_PORT=5133 -# PostgreSQL host for Electric connection -# - 'db' for Docker PostgreSQL (service name in docker-compose) -# - 'host.docker.internal' for local PostgreSQL (recommended when Electric runs in Docker) -# Note: host.docker.internal works on Docker Desktop (Mac/Windows) and can be enabled on Linux -POSTGRES_HOST=db -ELECTRIC_DB_USER=electric -ELECTRIC_DB_PASSWORD=electric_password -NEXT_PUBLIC_ELECTRIC_URL=http://localhost:5133 - -# pgAdmin Configuration -PGADMIN_PORT=5050 -PGADMIN_DEFAULT_EMAIL=admin@financegpt.com -PGADMIN_DEFAULT_PASSWORD=financegpt - -# Plaid Configuration (Financial Data Integration) -# Get your credentials from: https://dashboard.plaid.com/team/keys -PLAID_CLIENT_ID=your_plaid_client_id_here -PLAID_SECRET=your_plaid_secret_here -PLAID_ENV=sandbox -PLAID_REDIRECT_URI=http://localhost:8000/api/v1/plaid/oauth-redirect diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml index 4591408..a14ac8b 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/docker-publish.yml @@ -59,7 +59,7 @@ jobs: labels: ${{ steps.meta.outputs.labels }} cache-from: type=gha cache-to: type=gha,mode=max - platforms: linux/amd64 + platforms: linux/amd64,linux/arm64 - name: Image digest run: echo "Image pushed with digest ${{ steps.meta.outputs.digest }}" diff --git a/README.md b/README.md index a020e4a..6e69f8c 100644 --- a/README.md +++ b/README.md @@ -217,62 +217,149 @@ FinanceGPT uses specialized AI tools to analyze your financial data and provide ## �🚀 Quick Start -### Prerequisites +### Option 1: Quick Start (Recommended for Testing) -- **Node.js** 18+ and **pnpm** -- **Python** 3.11+ -- **Docker** and **Docker Compose** -- **PostgreSQL** 15+ +The fastest way to try FinanceGPT: -### Installation +```bash +# 1. Clone the repository +git clone https://github.com/yourusername/FinanceGPT.git +cd FinanceGPT + +# 2. Run the quick start script +chmod +x start-financegpt.sh +./start-financegpt.sh +``` + +This uses pre-built Docker images and requires minimal configuration. Access the app at **http://localhost:3000** + +### Option 2: Development Setup + +For development with full customization: + +#### Prerequisites + +- **Docker** and **Docker Compose** (required) +- **Node.js** 18+ and **pnpm** (for frontend development) +- **Python** 3.11+ (for backend development) + +#### Required Environment Variables -1. **Clone the repository** +Before starting, you **must** configure these essential variables: + +1. **Create backend environment file:** ```bash - git clone https://github.com/yourusername/FinanceGPT.git - cd FinanceGPT + cp financegpt_backend/.env.example financegpt_backend/.env ``` -2. **Set up environment variables** +2. **Edit `financegpt_backend/.env` and set these required variables:** + + ```env + # === REQUIRED: LLM API Key === + # You MUST configure at least one LLM provider for FinanceGPT to work + # Get your API key from the respective provider: + + # OpenAI (recommended for best results) + OPENAI_API_KEY=sk-... + + # OR Anthropic Claude + ANTHROPIC_API_KEY=sk-ant-... + + # OR Google Gemini (free tier available) + GOOGLE_API_KEY=AIza... + + # === REQUIRED: Plaid (for bank account connections) === + # Sign up at https://dashboard.plaid.com/team/keys + PLAID_CLIENT_ID=your_plaid_client_id + PLAID_SECRET=your_plaid_secret + PLAID_ENV=sandbox # Use 'sandbox' for testing + + # === REQUIRED: Security === + SECRET_KEY=your-random-secret-key-change-this-in-production + + # === Optional: Document Processing === + # Only needed if you want to parse PDFs/documents + # Get free API key from https://unstructured.io/ + # UNSTRUCTURED_API_KEY=your_key_here + ``` + +3. **Create frontend environment file:** ```bash - # Copy example env files cp financegpt_web/.env.example financegpt_web/.env.local - cp financegpt_backend/.env.example financegpt_backend/.env ``` -3. **Start the services with Docker** - ```bash - docker-compose up -d + Edit `financegpt_web/.env.local`: + ```env + # Backend API URL + NEXT_PUBLIC_FASTAPI_BACKEND_URL=http://localhost:8000 + + # Auth type (LOCAL or GOOGLE) + NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE=LOCAL + + # Document parsing service + NEXT_PUBLIC_ETL_SERVICE=DOCLING ``` -4. **Install frontend dependencies** +#### Installation Steps + +1. **Start infrastructure services (PostgreSQL, Redis, etc.)** ```bash - cd financegpt_web - pnpm install - pnpm dev + docker-compose up -d db redis electric pgadmin ``` -5. **Install backend dependencies** +2. **Install and run backend** ```bash - cd ../financegpt_backend + cd financegpt_backend + + # Create virtual environment python -m venv .venv source .venv/bin/activate # On Windows: .venv\Scripts\activate - pip install -r requirements.txt - ``` - -6. **Run database migrations** - ```bash + + # Install dependencies + pip install -e . + + # Run database migrations alembic upgrade head + + # Start the backend server + uvicorn main:app --reload --host 0.0.0.0 --port 8000 ``` -7. **Start the backend** +3. **In a new terminal, install and run frontend** ```bash - uvicorn main:app --reload + cd financegpt_web + + # Install dependencies + pnpm install + + # Start development server + pnpm dev ``` -8. **Access the application** - - Frontend: http://localhost:3000 - - Backend API: http://localhost:8000 - - API Docs: http://localhost:8000/docs +4. **Access the application** + - 🌐 **Frontend**: http://localhost:3000 + - 🔧 **Backend API**: http://localhost:8000 + - 📚 **API Documentation**: http://localhost:8000/docs + - 🗄️ **pgAdmin** (Database UI): http://localhost:5050 + - Email: `admin@financegpt.com` + - Password: `financegpt` + +### Option 3: Full Docker Deployment + +Build and run everything with Docker: + +```bash +# Build and start all services +docker-compose up -d + +# View logs +docker-compose logs -f + +# Stop all services +docker-compose down +``` + +Access at **http://localhost:3000** --- diff --git a/financegpt_backend/.env.example b/financegpt_backend/.env.example index 1f4613d..a506fc6 100644 --- a/financegpt_backend/.env.example +++ b/financegpt_backend/.env.example @@ -1,193 +1,128 @@ +# ============================================================================== +# CORE CONFIGURATION +# ============================================================================== + +# Database Configuration DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/financegpt -#Celery Config +# Redis & Celery Configuration CELERY_BROKER_URL=redis://localhost:6379/0 CELERY_RESULT_BACKEND=redis://localhost:6379/0 -#Electric(for migrations only) +# Electric (for database migrations only) ELECTRIC_DB_USER=electric ELECTRIC_DB_PASSWORD=electric_password -# Periodic task interval -# # Run every minute (default) -# SCHEDULE_CHECKER_INTERVAL=1m +# Periodic task interval (1m, 5m, 10m, 1h, 2h) +SCHEDULE_CHECKER_INTERVAL=5m -# # Run every 5 minutes -# SCHEDULE_CHECKER_INTERVAL=5m +# Security +SECRET_KEY=your_secret_key_here_change_in_production -# # Run every 10 minutes -# SCHEDULE_CHECKER_INTERVAL=10m +# Frontend URL +NEXT_FRONTEND_URL=http://localhost:3000 -# # Run every hour -# SCHEDULE_CHECKER_INTERVAL=1h +# Backend URL (optional, set when behind reverse proxy with HTTPS) +# BACKEND_URL=https://api.yourdomain.com -# # Run every 2 hours -# SCHEDULE_CHECKER_INTERVAL=2h -SCHEDULE_CHECKER_INTERVAL=5m +# ============================================================================== +# AUTHENTICATION +# ============================================================================== -SECRET_KEY=SECRET -NEXT_FRONTEND_URL=http://localhost:3000 +# Auth Type: LOCAL or GOOGLE +AUTH_TYPE=LOCAL +REGISTRATION_ENABLED=TRUE -# Backend URL for OAuth callbacks (optional, set when behind reverse proxy with HTTPS) -# BACKEND_URL=https://api.yourdomain.com +# Google OAuth (only needed if AUTH_TYPE=GOOGLE) +# GOOGLE_OAUTH_CLIENT_ID=your_google_client_id +# GOOGLE_OAUTH_CLIENT_SECRET=your_google_client_secret + +# ============================================================================== +# FINANCIAL DATA INTEGRATION +# ============================================================================== -# Auth -AUTH_TYPE=GOOGLE or LOCAL -REGISTRATION_ENABLED=TRUE or FALSE(Default: TRUE) -# For Google Auth Only -GOOGLE_OAUTH_CLIENT_ID=924507538m -GOOGLE_OAUTH_CLIENT_SECRET=GOCSV - -# Google Connector Specific Configurations -GOOGLE_CALENDAR_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/calendar/connector/callback -GOOGLE_GMAIL_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/gmail/connector/callback -GOOGLE_DRIVE_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/drive/connector/callback - -# Aitable OAuth Configuration -AIRTABLE_CLIENT_ID=your_airtable_client_id_here -AIRTABLE_CLIENT_SECRET=your_airtable_client_secret_here -AIRTABLE_REDIRECT_URI=http://localhost:8000/api/v1/auth/airtable/connector/callback - -# ClickUp OAuth Configuration -CLICKUP_CLIENT_ID=your_clickup_client_id_here -CLICKUP_CLIENT_SECRET=your_clickup_client_secret_here -CLICKUP_REDIRECT_URI=http://localhost:8000/api/v1/auth/clickup/connector/callback - -# Discord OAuth Configuration -DISCORD_CLIENT_ID=your_discord_client_id_here -DISCORD_CLIENT_SECRET=your_discord_client_secret_here -DISCORD_REDIRECT_URI=http://localhost:8000/api/v1/auth/discord/connector/callback -DISCORD_BOT_TOKEN=your_bot_token_from_developer_portal - -# Atlassian OAuth Configuration -ATLASSIAN_CLIENT_ID=your_atlassian_client_id_here -ATLASSIAN_CLIENT_SECRET=your_atlassian_client_secret_here -JIRA_REDIRECT_URI=http://localhost:8000/api/v1/auth/jira/connector/callback -CONFLUENCE_REDIRECT_URI=http://localhost:8000/api/v1/auth/confluence/connector/callback - -# Linear OAuth Configuration -LINEAR_CLIENT_ID=your_linear_client_id_here -LINEAR_CLIENT_SECRET=your_linear_client_secret_here -LINEAR_REDIRECT_URI=http://localhost:8000/api/v1/auth/linear/connector/callback - -# Notion OAuth Configuration -NOTION_CLIENT_ID=your_notion_client_id_here -NOTION_CLIENT_SECRET=your_notion_client_secret_here -NOTION_REDIRECT_URI=http://localhost:8000/api/v1/auth/notion/connector/callback - -# Slack OAuth Configuration -SLACK_CLIENT_ID=your_slack_client_id_here -SLACK_CLIENT_SECRET=your_slack_client_secret_here -SLACK_REDIRECT_URI=http://localhost:8000/api/v1/auth/slack/connector/callback - -# Teams OAuth Configuration -TEAMS_CLIENT_ID=your_teams_client_id_here -TEAMS_CLIENT_SECRET=your_teams_client_secret_here -TEAMS_REDIRECT_URI=http://localhost:8000/api/v1/auth/teams/connector/callback - -#Composio Coonnector -COMPOSIO_API_KEY=your_api_key_here -COMPOSIO_ENABLED=TRUE -COMPOSIO_REDIRECT_URI=http://localhost:8000/api/v1/auth/composio/connector/callback - -# Plaid Configuration (Financial Data Integration) -# Get your credentials from: https://dashboard.plaid.com/team/keys -# 1. Create a Plaid account at https://plaid.com -# 2. Navigate to Team Settings > Keys -# 3. Copy your client_id and secrets for the appropriate environment +# Plaid Configuration (for bank/brokerage account connections) +# Get credentials from: https://dashboard.plaid.com/team/keys PLAID_CLIENT_ID=your_plaid_client_id_here PLAID_SECRET=your_plaid_secret_here -# Environment: sandbox (testing), development (limited live access), production (full live access) PLAID_ENV=sandbox -# OAuth redirect URI for Plaid Link - must match what's configured in Plaid dashboard PLAID_REDIRECT_URI=http://localhost:8000/api/v1/plaid/oauth-redirect +# ============================================================================== +# AI/ML CONFIGURATION +# ============================================================================== + # Embedding Model -# Examples: -# # Get sentence transformers embeddings -# embeddings = AutoEmbeddings.get_embeddings("sentence-transformers/all-MiniLM-L6-v2") +EMBEDDING_MODEL=sentence-transformers/all-MiniLM-L6-v2 -# # Get OpenAI embeddings -# embeddings = AutoEmbeddings.get_embeddings("openai://text-embedding-ada-002", api_key="...") +# Rerankers (optional, for improved search results) +RERANKERS_ENABLED=FALSE +# RERANKERS_MODEL_NAME=ms-marco-MiniLM-L-12-v2 +# RERANKERS_MODEL_TYPE=flashrank -# # Get Anthropic embeddings -# embeddings = AutoEmbeddings.get_embeddings("anthropic://claude-v1", api_key="...") +# ============================================================================== +# DOCUMENT PROCESSING +# ============================================================================== -# # Get Cohere embeddings -# embeddings = AutoEmbeddings.get_embeddings("cohere://embed-english-light-v3.0", api_key="...") -EMBEDDING_MODEL=sentence-transformers/all-MiniLM-L6-v2 +# File Parser Service: UNSTRUCTURED or DOCLING +ETL_SERVICE=UNSTRUCTURED + +# Unstructured API (if ETL_SERVICE=UNSTRUCTURED) +# UNSTRUCTURED_API_KEY=your_key_here -# Rerankers Config -RERANKERS_ENABLED=TRUE or FALSE(Default: FALSE) -RERANKERS_MODEL_NAME=ms-marco-MiniLM-L-12-v2 -RERANKERS_MODEL_TYPE=flashrank +# Llama Cloud API (if ETL_SERVICE=DOCLING) +# LLAMA_CLOUD_API_KEY=your_key_here +# Maximum pages per user (default: unlimited) +PAGES_LIMIT=500 -# TTS_SERVICE=local/kokoro for local Kokoro TTS or -# LiteLLM TTS Provider: https://docs.litellm.ai/docs/text_to_speech#supported-providers +# ============================================================================== +# VOICE SERVICES (OPTIONAL) +# ============================================================================== + +# Text-to-Speech: local/kokoro or LiteLLM provider TTS_SERVICE=local/kokoro -# Respective TTS Service API # TTS_SERVICE_API_KEY= -# OPTIONAL: TTS Provider API Base # TTS_SERVICE_API_BASE= -# STT Service Configuration -# For local Faster-Whisper: local/MODEL_SIZE (tiny, base, small, medium, large-v3) +# Speech-to-Text: local/base or LiteLLM provider STT_SERVICE=local/base -# For LiteLLM STT Provider: https://docs.litellm.ai/docs/audio_transcription#supported-providers -# STT_SERVICE=openai/whisper-1 -# STT_SERVICE_API_KEY="" +# STT_SERVICE_API_KEY= # STT_SERVICE_API_BASE= +# ============================================================================== +# SERVER CONFIGURATION +# ============================================================================== -# (Optional) Maximum pages limit per user for ETL services (default: `999999999` for unlimited in OSS version) -PAGES_LIMIT=500 - - -FIRECRAWL_API_KEY=fcr-01J0000000000000000000000 - -# File Parser Service -ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING -UNSTRUCTURED_API_KEY=Tpu3P0U8iy -LLAMA_CLOUD_API_KEY=llx-nnn - -# OPTIONAL: Add these for LangSmith Observability -LANGSMITH_TRACING=true -LANGSMITH_ENDPOINT=https://api.smith.langchain.com -LANGSMITH_API_KEY=lsv2_pt_..... -LANGSMITH_PROJECT=financegpt - -# Uvicorn Server Configuration -# Full documentation for Uvicorn options can be found at: https://www.uvicorn.org/#command-line-options -UVICORN_HOST="0.0.0.0" +# Uvicorn Server Settings +UVICORN_HOST=0.0.0.0 UVICORN_PORT=8000 UVICORN_LOG_LEVEL=info -# OPTIONAL: Advanced Uvicorn Options (uncomment to use) -# UVICORN_PROXY_HEADERS=false -# UVICORN_FORWARDED_ALLOW_IPS="127.0.0.1" -# UVICORN_WORKERS=1 -# UVICORN_ACCESS_LOG=true -# UVICORN_LOOP="auto" -# UVICORN_HTTP="auto" -# UVICORN_WS="auto" -# UVICORN_LIFESPAN="auto" -# UVICORN_LOG_CONFIG="" -# UVICORN_SERVER_HEADER=true -# UVICORN_DATE_HEADER=true -# UVICORN_LIMIT_CONCURRENCY= -# UVICORN_LIMIT_MAX_REQUESTS= -# UVICORN_TIMEOUT_KEEP_ALIVE=5 -# UVICORN_TIMEOUT_NOTIFY=30 -# UVICORN_SSL_KEYFILE="" -# UVICORN_SSL_CERTFILE="" -# UVICORN_SSL_KEYFILE_PASSWORD="" -# UVICORN_SSL_VERSION="" -# UVICORN_SSL_CERT_REQS="" -# UVICORN_SSL_CA_CERTS="" -# UVICORN_SSL_CIPHERS="" -# UVICORN_HEADERS="" -# UVICORN_USE_COLORS=true -# UVICORN_UDS="" -# UVICORN_FD="" -# UVICORN_ROOT_PATH="" +# ============================================================================== +# OPTIONAL: OBSERVABILITY +# ============================================================================== + +# LangSmith (for LLM observability and debugging) +# LANGSMITH_TRACING=true +# LANGSMITH_ENDPOINT=https://api.smith.langchain.com +# LANGSMITH_API_KEY=lsv2_pt_your_key_here +# LANGSMITH_PROJECT=financegpt + +# ============================================================================== +# OPTIONAL: GOOGLE CONNECTORS +# ============================================================================== + +# Google Drive/Gmail/Calendar connectors (if using Composio for Google services) +# GOOGLE_CALENDAR_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/calendar/connector/callback +# GOOGLE_GMAIL_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/gmail/connector/callback +# GOOGLE_DRIVE_REDIRECT_URI=http://localhost:8000/api/v1/auth/google/drive/connector/callback + +# ============================================================================== +# OPTIONAL: COMPOSIO (for additional integrations) +# ============================================================================== + +# Composio enables additional app integrations (Slack, Notion, etc.) +# COMPOSIO_API_KEY=your_composio_key_here +# COMPOSIO_ENABLED=FALSE +# COMPOSIO_REDIRECT_URI=http://localhost:8000/api/v1/auth/composio/connector/callback diff --git a/financegpt_backend/alembic/versions/1_add_investment_holdings_tables.py b/financegpt_backend/alembic/versions/1_add_investment_holdings_tables.py new file mode 100644 index 0000000..8a2119e --- /dev/null +++ b/financegpt_backend/alembic/versions/1_add_investment_holdings_tables.py @@ -0,0 +1,147 @@ +"""add_investment_holdings_tables + +Revision ID: 1 +Revises: 0 +Create Date: 2026-01-29 00:00:00.000000 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID, JSONB +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = '1' +down_revision: Union[str, None] = '0' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create investment-related tables.""" + + # Investment accounts table + op.create_table( + 'investment_accounts', + sa.Column('id', UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')), + sa.Column('user_id', UUID(as_uuid=True), sa.ForeignKey('user.id', ondelete='CASCADE'), nullable=False), + sa.Column('account_number', sa.String(255), nullable=True), + sa.Column('account_name', sa.String(255), nullable=False), + sa.Column('account_type', sa.String(100), nullable=False), # brokerage, IRA, 401k, etc. + sa.Column('account_tax_type', sa.String(50), nullable=False), # taxable, tax_deferred, tax_free + sa.Column('institution', sa.String(255), nullable=True), # Fidelity, Vanguard, etc. + sa.Column('total_value', sa.Numeric(20, 2), nullable=True), + sa.Column('cash_balance', sa.Numeric(20, 2), nullable=True), + sa.Column('last_synced_at', sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column('source_type', sa.String(50), nullable=False), # plaid, document, manual + sa.Column('source_id', sa.String(255), nullable=True), # plaid account_id or document_id + sa.Column('metadata', JSONB, nullable=True), + sa.Column('created_at', sa.TIMESTAMP(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column('updated_at', sa.TIMESTAMP(timezone=True), server_default=sa.func.now(), onupdate=sa.func.now(), nullable=False), + ) + op.create_index('ix_investment_accounts_user_id', 'investment_accounts', ['user_id']) + op.create_index('ix_investment_accounts_account_number', 'investment_accounts', ['account_number']) + + # Investment holdings table + op.create_table( + 'investment_holdings', + sa.Column('id', UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')), + sa.Column('account_id', UUID(as_uuid=True), sa.ForeignKey('investment_accounts.id', ondelete='CASCADE'), nullable=False), + sa.Column('symbol', sa.String(20), nullable=False), + sa.Column('description', sa.String(500), nullable=True), + + # Quantities and values + sa.Column('quantity', sa.Numeric(20, 8), nullable=False), + sa.Column('cost_basis', sa.Numeric(20, 2), nullable=False), + sa.Column('average_cost_basis', sa.Numeric(20, 2), nullable=True), + sa.Column('current_price', sa.Numeric(20, 2), nullable=True), + sa.Column('market_value', sa.Numeric(20, 2), nullable=True), + + # Performance metrics + sa.Column('unrealized_gain_loss', sa.Numeric(20, 2), nullable=True), + sa.Column('unrealized_gain_loss_pct', sa.Numeric(10, 4), nullable=True), + sa.Column('day_change', sa.Numeric(20, 2), nullable=True), + sa.Column('day_change_pct', sa.Numeric(10, 4), nullable=True), + sa.Column('previous_close', sa.Numeric(20, 2), nullable=True), + + # Classification + sa.Column('asset_type', sa.String(50), nullable=True), # stock, bond, ETF, mutual_fund, crypto + sa.Column('sector', sa.String(100), nullable=True), + sa.Column('industry', sa.String(100), nullable=True), + sa.Column('geographic_region', sa.String(100), nullable=True), + + # Tax data + sa.Column('acquisition_date', sa.Date, nullable=True), + sa.Column('holding_period_days', sa.Integer, nullable=True), + sa.Column('is_long_term', sa.Boolean, default=False, nullable=False), + + # Metadata + sa.Column('price_as_of_timestamp', sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column('extraction_confidence', sa.Numeric(3, 2), nullable=True), + sa.Column('metadata', JSONB, nullable=True), + sa.Column('created_at', sa.TIMESTAMP(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column('updated_at', sa.TIMESTAMP(timezone=True), server_default=sa.func.now(), onupdate=sa.func.now(), nullable=False), + ) + op.create_index('ix_investment_holdings_account_id', 'investment_holdings', ['account_id']) + op.create_index('ix_investment_holdings_symbol', 'investment_holdings', ['symbol']) + op.create_index('ix_investment_holdings_asset_type', 'investment_holdings', ['asset_type']) + + # Transactions table for wash sale detection and history + op.create_table( + 'investment_transactions', + sa.Column('id', UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')), + sa.Column('account_id', UUID(as_uuid=True), sa.ForeignKey('investment_accounts.id', ondelete='CASCADE'), nullable=False), + sa.Column('symbol', sa.String(20), nullable=False), + sa.Column('transaction_type', sa.String(50), nullable=False), # buy, sell, dividend, etc. + sa.Column('transaction_date', sa.Date, nullable=False), + sa.Column('quantity', sa.Numeric(20, 8), nullable=False), + sa.Column('price', sa.Numeric(20, 2), nullable=False), + sa.Column('amount', sa.Numeric(20, 2), nullable=False), + sa.Column('fees', sa.Numeric(20, 2), nullable=True), + sa.Column('description', sa.String(500), nullable=True), + sa.Column('metadata', JSONB, nullable=True), + sa.Column('created_at', sa.TIMESTAMP(timezone=True), server_default=sa.func.now(), nullable=False), + ) + op.create_index('ix_investment_transactions_account_id', 'investment_transactions', ['account_id']) + op.create_index('ix_investment_transactions_symbol', 'investment_transactions', ['symbol']) + op.create_index('ix_investment_transactions_date', 'investment_transactions', ['transaction_date']) + + # Portfolio allocation targets table + op.create_table( + 'portfolio_allocation_targets', + sa.Column('id', UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')), + sa.Column('user_id', UUID(as_uuid=True), sa.ForeignKey('user.id', ondelete='CASCADE'), nullable=False), + sa.Column('target_stocks_pct', sa.Numeric(5, 2), default=60.0, nullable=False), + sa.Column('target_bonds_pct', sa.Numeric(5, 2), default=30.0, nullable=False), + sa.Column('target_cash_pct', sa.Numeric(5, 2), default=10.0, nullable=False), + sa.Column('target_international_pct', sa.Numeric(5, 2), nullable=True), + sa.Column('created_at', sa.TIMESTAMP(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column('updated_at', sa.TIMESTAMP(timezone=True), server_default=sa.func.now(), onupdate=sa.func.now(), nullable=False), + ) + op.create_index('ix_portfolio_allocation_targets_user_id', 'portfolio_allocation_targets', ['user_id']) + + # Set REPLICA IDENTITY FULL on new tables for Electric SQL + op.execute(""" + DO $$ + DECLARE + r RECORD; + BEGIN + FOR r IN + SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename IN ('investment_accounts', 'investment_holdings', 'investment_transactions', 'portfolio_allocation_targets') + LOOP + EXECUTE 'ALTER TABLE public.' || quote_ident(r.tablename) || ' REPLICA IDENTITY FULL'; + END LOOP; + END $$; + """) + + +def downgrade() -> None: + """Drop investment-related tables.""" + op.drop_table('portfolio_allocation_targets') + op.drop_table('investment_transactions') + op.drop_table('investment_holdings') + op.drop_table('investment_accounts') diff --git a/financegpt_backend/app/agents/new_chat/system_prompt.py b/financegpt_backend/app/agents/new_chat/system_prompt.py index 0f284bf..4126e46 100644 --- a/financegpt_backend/app/agents/new_chat/system_prompt.py +++ b/financegpt_backend/app/agents/new_chat/system_prompt.py @@ -30,7 +30,33 @@ When users ask about investment portfolio, ALWAYS use the appropriate specialized tool: -### Portfolio Performance Tool (calculate_portfolio_performance) +### For UPLOADED Investment Holdings (CSV/Manual Entry): + +**Portfolio Performance Tool (check_portfolio_performance)** +Use for questions about **TODAY'S performance from uploaded holdings**: +- "How are my stocks performing today?" +- "What are my top gainers and losers today?" +- "Show me today's market performance" +- "How much did I gain/lose today?" + +**Portfolio Allocation Tool (analyze_holdings_allocation)** +Use for questions about **allocation from uploaded holdings**: +- "How is my portfolio allocated?" +- "Show me my asset allocation" +- "Do I need to rebalance?" +- "What's my sector breakdown?" +- "Am I diversified enough?" + +**Tax Loss Harvesting Tool (find_holdings_tax_loss_harvesting)** +Use for questions about **tax opportunities from uploaded holdings**: +- "Are there any tax loss harvesting opportunities?" +- "Can I save on taxes?" +- "Show me my losses for tax purposes" +- "What stocks can I sell for tax benefits?" + +### For PLAID-Connected Accounts (Historical Performance): + +**Portfolio Performance Tool (calculate_portfolio_performance)** Use for questions about **returns, gains, performance over time**: - "How are my investments/stocks/portfolio doing?" - "What's my portfolio worth?" or "portfolio value" @@ -39,9 +65,7 @@ - "Show my portfolio performance" - "How much have I made/lost?" -The tool will fetch real-time prices from Yahoo Finance and calculate actual returns. - -### Portfolio Allocation Tool (analyze_portfolio_allocation) +**Portfolio Allocation Tool (analyze_portfolio_allocation)** Use for questions about **asset allocation, diversification, and rebalancing**: - "Is my portfolio allocation correct?" - "How should I rebalance my portfolio?" @@ -68,7 +92,7 @@ **Example Response Pattern:** "Your portfolio is 95% stocks and 5% bonds, which is more aggressive than the recommended 60/35/5 for moderate investors. You're also 100% in US stocks with no international exposure. I recommend rebalancing by selling $100,000 in US stocks and buying $90,000 in international stocks (VXUS) and $10,000 in bonds (BND) to align with Bogleheads recommendations." -### Tax Loss Harvesting Tool (analyze_tax_loss_harvesting) +**Tax Loss Harvesting Tool (analyze_tax_loss_harvesting)** Use for questions about **tax optimization, harvesting losses, tax savings**: - "Can I harvest any tax losses?" - "What stocks should I sell for tax losses?" @@ -130,11 +154,10 @@ * **Unrealized Gains/Losses**: For each position, show gain/loss in $ and % terms * **Overall Portfolio Performance**: Sum total gains across all holdings to show portfolio-level returns * **Winner/Loser Analysis**: Identify best and worst performing holdings - * **Time-Based Performance (WoW, MoM, YoY)**: Even with only current holdings, calculate historical performance by: - - Extract ticker symbols from current holdings (e.g., GOOG, AAPL, BTC) - - Use web search (scrape_webpage) to find historical stock prices for those tickers - - Calculate performance: If user has 10 GOOG shares at $150 today, and GOOG was $145 last week, that's +3.4% WoW - - Aggregate across all holdings to show total portfolio performance over any time period + * **Time-Based Performance (WoW, MoM, YoY)**: Use the appropriate portfolio performance tool which has built-in Yahoo Finance API integration: + - For uploaded holdings: Use check_portfolio_performance for today's performance + - For Plaid accounts: Use calculate_portfolio_performance for historical performance over any period + - The tools automatically fetch current and historical stock prices via Yahoo Finance API - For mutual funds/ETFs without tickers, use cost basis or note that specific historical data is limited * **Note**: Cost basis represents your purchase price, so returns calculated from it show performance since purchase (which may be YoY, multi-year, or recent depending on when you bought) - Compare spending across categories to find optimization opportunities @@ -511,26 +534,21 @@ HISTORICAL COMPARISONS (USE DATE FILTERS OR WEB SEARCH): - User: "How are my investments performing year-over-year?" - - Option 1 (if historical holdings data exists): - * First call: `search_knowledge_base(query="investment holdings portfolio value", start_date="2025-01-01", end_date="2025-01-31")` (last year) - * Second call: `search_knowledge_base(query="investment holdings portfolio value", start_date="2026-01-01", end_date="2026-01-26")` (this year) - * Then: Calculate the difference and percentage change - - Option 2 (with current holdings only - RECOMMENDED): - * Get current holdings: `search_knowledge_base(query="investment holdings portfolio stocks")` - * Extract ticker symbols (GOOG, AAPL, BTC, etc.) - * Search web for historical prices: `scrape_webpage(url="https://finance.yahoo.com/quote/GOOG/history")` or similar - * Calculate: (Current Price - Price 1 Year Ago) / Price 1 Year Ago × 100 for each holding - * Aggregate to show total portfolio YoY performance + - Call: `calculate_portfolio_performance(period="1y")` (the tool has built-in Yahoo Finance API integration) + - The tool automatically fetches current and historical prices for all holdings + - Returns detailed performance analysis including total gains/losses, percentage changes, and per-holding breakdown - User: "Show my portfolio growth over the last quarter" - - Get current holdings: `search_knowledge_base(query="investment holdings portfolio")` - - Extract tickers and current quantities/prices - - Search for stock prices from 3 months ago using web search - - Calculate quarterly performance based on price changes + - Call: `calculate_portfolio_performance(period="3mo")` (automatically fetches prices from 3 months ago) + - Returns quarterly performance with built-in price data - User: "What's my week-over-week portfolio performance?" - - Get current holdings with tickers - - Search web for stock prices from 1 week ago + - Call: `calculate_portfolio_performance(period="1w")` (automatically fetches prices from 1 week ago) + - Returns weekly performance with built-in price data + +- User: "How are my stocks performing today?" + - Call: `check_portfolio_performance()` (for uploaded holdings with today's real-time prices) + - Returns today's gains/losses, top performers, and market values - Calculate: For 10 GOOG shares at $150 today vs $145 last week = +$50 (+3.4%) - Aggregate across all holdings for total WoW performance diff --git a/financegpt_backend/app/agents/new_chat/tools/investment_holdings.py b/financegpt_backend/app/agents/new_chat/tools/investment_holdings.py new file mode 100644 index 0000000..ad150da --- /dev/null +++ b/financegpt_backend/app/agents/new_chat/tools/investment_holdings.py @@ -0,0 +1,86 @@ +"""Portfolio performance tool using structured investment holdings data.""" +from langchain_core.tools import tool +from sqlalchemy.ext.asyncio import AsyncSession + + +def create_check_portfolio_performance_tool(user_id: str, db_session: AsyncSession): + """ + Factory function to create the portfolio performance tool. + + This checks today's stock performance from uploaded investment holdings. + """ + + @tool + async def check_portfolio_performance() -> dict: + """ + Get today's portfolio performance including top gainers and losers. + Returns real-time price changes, gains/losses for all stock holdings. + + Use for questions like: + - "How are my stocks performing today?" + - "What are my top gainers and losers?" + - "Show me today's performance" + + Returns: + Dict with total portfolio value, day change, top gainers/losers + """ + from app.agents.tools.investment_tools import check_portfolio_performance as _check_perf + return await _check_perf.ainvoke({"user_id": str(user_id)}) + + return check_portfolio_performance + + +def create_analyze_portfolio_allocation_tool(user_id: str, db_session: AsyncSession): + """ + Factory function to create the portfolio allocation analysis tool. + + Analyzes allocation across asset classes, sectors, and compares to targets. + """ + + @tool + async def analyze_portfolio_allocation() -> dict: + """ + Analyze portfolio allocation across asset classes, sectors, and regions. + Compares current allocation to user's targets and suggests rebalancing. + + Use for questions like: + - "How is my portfolio allocated?" + - "Show me my asset allocation" + - "Do I need to rebalance?" + - "What's my sector breakdown?" + + Returns: + Dict with allocation breakdowns and rebalancing suggestions + """ + from app.agents.tools.investment_tools import analyze_portfolio_allocation as _analyze + return await _analyze.ainvoke({"user_id": str(user_id)}) + + return analyze_portfolio_allocation + + +def create_find_tax_loss_harvesting_tool(user_id: str, db_session: AsyncSession): + """ + Factory function to create the tax loss harvesting tool. + + Identifies opportunities to harvest losses in taxable accounts. + """ + + @tool + async def find_tax_loss_harvesting_opportunities() -> dict: + """ + Identify tax loss harvesting opportunities in taxable accounts. + Returns holdings with unrealized losses that qualify for tax harvesting. + Checks for wash sale risks (purchases in last 30 days). + + Use for questions like: + - "Are there any tax loss harvesting opportunities?" + - "Can I save on taxes?" + - "Show me my losses for tax purposes" + + Returns: + Dict with harvesting opportunities and potential tax savings + """ + from app.agents.tools.investment_tools import find_tax_loss_harvesting_opportunities as _find_tlh + return await _find_tlh.ainvoke({"user_id": str(user_id)}) + + return find_tax_loss_harvesting_opportunities diff --git a/financegpt_backend/app/agents/new_chat/tools/registry.py b/financegpt_backend/app/agents/new_chat/tools/registry.py index 306a335..9257729 100644 --- a/financegpt_backend/app/agents/new_chat/tools/registry.py +++ b/financegpt_backend/app/agents/new_chat/tools/registry.py @@ -45,6 +45,11 @@ async def my_tool(param: str) -> dict: from .display_image import create_display_image_tool from .find_subscriptions import create_find_subscriptions_tool +from .investment_holdings import ( + create_analyze_portfolio_allocation_tool, + create_check_portfolio_performance_tool, + create_find_tax_loss_harvesting_tool, +) from .knowledge_base import create_search_knowledge_base_tool from .link_preview import create_link_preview_tool from .mcp_tool import load_mcp_tools @@ -148,7 +153,40 @@ class ToolDefinition: requires=["user_id", "search_space_id", "db_session"], ), # ========================================================================= - # FINANCIAL ANALYSIS TOOLS + # FINANCIAL ANALYSIS TOOLS - STRUCTURED INVESTMENT DATA + # ========================================================================= + # Portfolio performance from uploaded holdings - real-time stock performance + ToolDefinition( + name="check_portfolio_performance", + description="Get real-time portfolio performance from uploaded investment holdings (today's gains/losses, top performers)", + factory=lambda deps: create_check_portfolio_performance_tool( + user_id=deps["user_id"], + db_session=deps["db_session"], + ), + requires=["user_id", "db_session"], + ), + # Portfolio allocation analysis from uploaded holdings + ToolDefinition( + name="analyze_holdings_allocation", + description="Analyze portfolio allocation by asset class and sector from uploaded investment holdings", + factory=lambda deps: create_analyze_portfolio_allocation_tool( + user_id=deps["user_id"], + db_session=deps["db_session"], + ), + requires=["user_id", "db_session"], + ), + # Tax loss harvesting from uploaded holdings + ToolDefinition( + name="find_holdings_tax_loss_harvesting", + description="Find tax loss harvesting opportunities from uploaded investment holdings in taxable accounts", + factory=lambda deps: create_find_tax_loss_harvesting_tool( + user_id=deps["user_id"], + db_session=deps["db_session"], + ), + requires=["user_id", "db_session"], + ), + # ========================================================================= + # FINANCIAL ANALYSIS TOOLS - PLAID INTEGRATION # ========================================================================= # Transaction search - keyword-based search for specific merchants/transactions ToolDefinition( diff --git a/financegpt_backend/app/agents/tools/investment_tools.py b/financegpt_backend/app/agents/tools/investment_tools.py new file mode 100644 index 0000000..ce666a0 --- /dev/null +++ b/financegpt_backend/app/agents/tools/investment_tools.py @@ -0,0 +1,302 @@ +"""Specialized tools for portfolio analysis and investment queries.""" +from datetime import date, datetime, timedelta +from decimal import Decimal +from typing import Any +from uuid import UUID + +from langchain_core.tools import tool +from sqlalchemy import select, func, and_ +from sqlalchemy.ext.asyncio import AsyncSession + +from app.db import get_async_session +from app.schemas.investments import ( + AllocationBreakdown, + HoldingPerformance, + PortfolioAllocationResponse, + PortfolioPerformanceResponse, + TaxHarvestingOpportunity, + TaxHarvestingResponse, +) + + +@tool +async def check_portfolio_performance(user_id: str) -> dict[str, Any]: + """ + Get portfolio holdings and performance data. + Returns holdings with cost basis so the agent can search for current prices. + + IMPORTANT: After calling this tool, search for current stock prices for each symbol + and calculate the current market values and gains/losses. + + Use for questions like: + - "How are my stocks performing today?" + - "What are my top gainers and losers?" + - "Show me my portfolio value" + + Args: + user_id: UUID of the user + + Returns: + Dict with holdings data - agent should enrich with current prices via search + """ + from app.db import InvestmentHolding, InvestmentAccount + + user_uuid = UUID(user_id) + + async for session in get_async_session(): + # Query holdings (basic data only) + query = ( + select(InvestmentHolding) + .join(InvestmentAccount) + .where(InvestmentAccount.user_id == user_uuid) + ) + + result = await session.execute(query) + holdings = result.scalars().all() + + if not holdings: + return { + "error": "No holdings found. Please upload your portfolio first." + } + + # Return holdings data for agent to enrich with current prices + holdings_data = [ + { + "symbol": h.symbol, + "description": h.description or h.symbol, + "quantity": float(h.quantity or 0), + "cost_basis": float(h.cost_basis or 0), + "average_cost_basis": float(h.average_cost_basis or 0), + } + for h in holdings + ] + + total_cost_basis = sum(h["cost_basis"] for h in holdings_data) + + return { + "holdings": holdings_data, + "total_cost_basis": total_cost_basis, + "instruction": f"Search for current stock prices for these symbols: {', '.join(h['symbol'] for h in holdings_data)}. Then calculate market value (price * quantity) and gains/losses for each holding.", + } + + +@tool +async def analyze_portfolio_allocation(user_id: str) -> dict[str, Any]: + """ + Analyze portfolio allocation across asset classes, sectors, and regions. + Compares current allocation to user's targets and suggests rebalancing. + + Use for questions like: + - "How is my portfolio allocated?" + - "Show me my asset allocation" + - "Do I need to rebalance?" + - "What's my sector breakdown?" + + Args: + user_id: UUID of the user + + Returns: + Dict with allocation breakdowns and rebalancing suggestions + """ + from app.db import InvestmentHolding, InvestmentAccount, PortfolioAllocationTarget + + user_uuid = UUID(user_id) + + async for session in get_async_session(): + # Get all holdings + query = ( + select(InvestmentHolding) + .join(InvestmentAccount) + .where(InvestmentAccount.user_id == user_uuid) + ) + result = await session.execute(query) + holdings = result.scalars().all() + + if not holdings: + return {"error": "No holdings with market value found"} + + # Calculate total value + total_value = sum(float(h.market_value or 0) for h in holdings) + + if total_value == 0: + return {"error": "No holdings with market value found"} + + # Group by asset type + asset_groups = {} + for h in holdings: + asset_type = h.asset_type or "Unknown" + if asset_type not in asset_groups: + asset_groups[asset_type] = 0.0 + asset_groups[asset_type] += float(h.market_value or 0) + + by_asset_type = [ + AllocationBreakdown( + category=asset_type, + value=value, + percentage=value / total_value * 100, + ) + for asset_type, value in sorted(asset_groups.items(), key=lambda x: x[1], reverse=True) + ] + + # Group by sector + sector_groups = {} + for h in holdings: + if h.sector: + sector = h.sector + if sector not in sector_groups: + sector_groups[sector] = 0.0 + sector_groups[sector] += float(h.market_value or 0) + + by_sector = [ + AllocationBreakdown( + category=sector, + value=value, + percentage=value / total_value * 100, + ) + for sector, value in sorted(sector_groups.items(), key=lambda x: x[1], reverse=True) + ] + + # Get user's target allocation + targets_query = select(PortfolioAllocationTarget).where( + PortfolioAllocationTarget.user_id == user_uuid + ) + targets_result = await session.execute(targets_query) + targets = targets_result.scalar_one_or_none() + + # Check if rebalancing needed (> 5% variance from targets) + rebalancing_needed = False + suggestions = [] + + if targets: + for alloc in by_asset_type: + if alloc.category.lower() == "stock": + target_stocks_pct = float(targets.target_stocks_pct or 0) + variance = alloc.percentage - target_stocks_pct + alloc.target_percentage = target_stocks_pct + alloc.variance = variance + if abs(variance) > 5: + rebalancing_needed = True + action = "Sell" if variance > 0 else "Buy" + suggestions.append(f"{action} stocks by ~{abs(variance):.1f}%") + + response = PortfolioAllocationResponse( + total_value=total_value, + by_asset_type=by_asset_type, + by_sector=by_sector, + rebalancing_needed=rebalancing_needed, + rebalancing_suggestions=suggestions if suggestions else None, + ) + + return response.model_dump() + + +@tool +async def find_tax_loss_harvesting_opportunities(user_id: str) -> dict[str, Any]: + """ + Identify tax loss harvesting opportunities in taxable accounts. + Returns holdings with unrealized losses that qualify for tax harvesting. + Checks for wash sale risks (purchases in last 30 days). + + Use for questions like: + - "Are there any tax loss harvesting opportunities?" + - "Can I save on taxes?" + - "Show me my losses for tax purposes" + + Args: + user_id: UUID of the user + + Returns: + Dict with harvesting opportunities and potential tax savings + """ + from app.db import InvestmentHolding, InvestmentAccount, InvestmentTransaction + + user_uuid = UUID(user_id) + + async for session in get_async_session(): + # Query holdings in taxable accounts with losses + holdings_query = ( + select(InvestmentHolding) + .join(InvestmentAccount) + .where(InvestmentAccount.user_id == user_uuid) + .where(InvestmentAccount.account_tax_type == "taxable") + ) + + result = await session.execute(holdings_query) + all_holdings = result.scalars().all() + + # Filter holdings with meaningful losses + holdings_with_losses = [h for h in all_holdings if float(h.unrealized_gain_loss or 0) < -1000] + + if not holdings_with_losses: + return { + "opportunities": [], + "total_potential_loss": 0, + "total_potential_tax_savings": 0, + "warnings": ["No significant losses found in taxable accounts."] + } + + opportunities = [] + total_potential_loss = 0.0 + warnings = [] + + for holding in holdings_with_losses: + # Check for wash sale risk (buys in last 30 days) + wash_sale_query = ( + select(func.count()) + .select_from(InvestmentTransaction) + .where(InvestmentTransaction.symbol == holding.symbol) + .where(InvestmentTransaction.transaction_type == "buy") + .where(InvestmentTransaction.transaction_date > (date.today() - timedelta(days=30))) + ) + wash_result = await session.execute(wash_sale_query) + has_recent_buys = wash_result.scalar() > 0 + + # Calculate holding period (days since acquisition) + holding_period_days = 0 + if holding.acquisition_date: + holding_period_days = (date.today() - holding.acquisition_date.date()).days + + is_long_term = holding_period_days > 365 + + # Estimate tax savings (15% or 20% for long-term, up to 37% for short-term) + tax_rate = 0.15 if is_long_term else 0.25 + unrealized_loss = abs(float(holding.unrealized_gain_loss or 0)) + potential_tax_savings = unrealized_loss * tax_rate + + opportunity = TaxHarvestingOpportunity( + symbol=holding.symbol, + quantity=float(holding.quantity or 0), + cost_basis=float(holding.cost_basis or 0), + current_value=float(holding.market_value or 0), + unrealized_loss=-unrealized_loss, + holding_period_days=holding_period_days, + is_long_term=is_long_term, + potential_tax_savings=potential_tax_savings, + wash_sale_risk=has_recent_buys, + ) + + opportunities.append(opportunity) + total_potential_loss += unrealized_loss + + if has_recent_buys: + warnings.append(f"⚠️ {holding.symbol}: Wash sale risk - purchased in last 30 days") + + # Calculate total tax savings + total_tax_savings = sum(opp.potential_tax_savings for opp in opportunities) + + response = TaxHarvestingResponse( + opportunities=opportunities, + total_potential_loss=total_potential_loss, + total_potential_tax_savings=total_tax_savings, + warnings=warnings if warnings else ["All opportunities look good!"], + ) + + return response.model_dump() + + +# Export tools for agent +INVESTMENT_TOOLS = [ + check_portfolio_performance, + analyze_portfolio_allocation, + find_tax_loss_harvesting_opportunities, +] diff --git a/financegpt_backend/app/db.py b/financegpt_backend/app/db.py index cd9ae25..56f6687 100644 --- a/financegpt_backend/app/db.py +++ b/financegpt_backend/app/db.py @@ -7,6 +7,7 @@ from pgvector.sqlalchemy import Vector from sqlalchemy import ( ARRAY, + Date, JSON, TIMESTAMP, Boolean, @@ -14,6 +15,7 @@ Enum as SQLAlchemyEnum, ForeignKey, Integer, + Numeric, String, Text, UniqueConstraint, @@ -831,6 +833,134 @@ class SearchSourceConnector(BaseModel, TimestampMixin): ) +class InvestmentAccount(BaseModel, TimestampMixin): + """Investment account model for tracking user's investment accounts.""" + + __tablename__ = "investment_accounts" + + id = Column(UUID(as_uuid=True), primary_key=True, server_default=text('gen_random_uuid()'), index=True) + user_id = Column( + UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False + ) + account_number = Column(String(255), nullable=True, index=True) + account_name = Column(String(255), nullable=False) + account_type = Column(String(100), nullable=False) # brokerage, 401k, IRA, etc. + account_tax_type = Column( + String(50), nullable=False + ) # taxable, tax_deferred, tax_free + institution = Column(String(255), nullable=True) + total_value = Column(Numeric(20, 2), nullable=True) + cash_balance = Column(Numeric(20, 2), nullable=True) + last_synced_at = Column(TIMESTAMP(timezone=True), nullable=True) + source_type = Column(String(50), nullable=False, default="manual") # plaid, document, manual + source_id = Column(String(255), nullable=True) # plaid account_id or document_id + metadata_ = Column("metadata", JSONB, nullable=True) + + # Relationships + holdings = relationship( + "InvestmentHolding", + back_populates="account", + cascade="all, delete-orphan", + ) + transactions = relationship( + "InvestmentTransaction", + back_populates="account", + cascade="all, delete-orphan", + ) + + +class InvestmentHolding(BaseModel, TimestampMixin): + """Investment holding model for tracking individual securities.""" + + __tablename__ = "investment_holdings" + + id = Column(UUID(as_uuid=True), primary_key=True, server_default=text('gen_random_uuid()'), index=True) + account_id = Column( + UUID(as_uuid=True), + ForeignKey("investment_accounts.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + symbol = Column(String(20), nullable=False, index=True) + description = Column(String(500), nullable=True) + + # Quantities and values + quantity = Column(Numeric(20, 8), nullable=False) + cost_basis = Column(Numeric(20, 2), nullable=False) + average_cost_basis = Column(Numeric(20, 2), nullable=True) + current_price = Column(Numeric(20, 2), nullable=True) + market_value = Column(Numeric(20, 2), nullable=True) + + # Performance metrics + unrealized_gain_loss = Column(Numeric(20, 2), nullable=True) + unrealized_gain_loss_pct = Column(Numeric(10, 4), nullable=True) + day_change = Column(Numeric(20, 2), nullable=True) + day_change_pct = Column(Numeric(10, 4), nullable=True) + previous_close = Column(Numeric(20, 2), nullable=True) + + # Classification + asset_type = Column(String(50), nullable=True) + sector = Column(String(100), nullable=True) + industry = Column(String(100), nullable=True) + geographic_region = Column(String(100), nullable=True) + + # Tax data + acquisition_date = Column(Date, nullable=True) + holding_period_days = Column(Integer, nullable=True) + is_long_term = Column(Boolean, default=False, nullable=False) + + # Metadata + price_as_of_timestamp = Column(TIMESTAMP(timezone=True), nullable=True) + extraction_confidence = Column(Numeric(3, 2), nullable=True) + metadata_ = Column("metadata", JSONB, nullable=True) + + # Relationships + account = relationship("InvestmentAccount", back_populates="holdings") + + +class InvestmentTransaction(BaseModel, TimestampMixin): + """Investment transaction model for tracking buy/sell history.""" + + __tablename__ = "investment_transactions" + + id = Column(UUID(as_uuid=True), primary_key=True, server_default=text('gen_random_uuid()'), index=True) + account_id = Column( + UUID(as_uuid=True), + ForeignKey("investment_accounts.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + symbol = Column(String(20), nullable=False, index=True) + transaction_type = Column(String(50), nullable=False) # buy, sell, dividend, etc. + transaction_date = Column(Date, nullable=False, index=True) + quantity = Column(Numeric(20, 8), nullable=False) + price = Column(Numeric(20, 2), nullable=False) + amount = Column(Numeric(20, 2), nullable=False) + fees = Column(Numeric(20, 2), nullable=True) + description = Column(String(500), nullable=True) + metadata_ = Column("metadata", JSONB, nullable=True) + + # Relationships + account = relationship("InvestmentAccount", back_populates="transactions") + + +class PortfolioAllocationTarget(BaseModel, TimestampMixin): + """Portfolio allocation targets for rebalancing recommendations.""" + + __tablename__ = "portfolio_allocation_targets" + + id = Column(UUID(as_uuid=True), primary_key=True, server_default=text('gen_random_uuid()'), index=True) + user_id = Column( + UUID(as_uuid=True), ForeignKey("user.id", ondelete="CASCADE"), nullable=False + ) + target_name = Column(String(100), nullable=False) + target_stocks_pct = Column(JSONB, nullable=True) + target_bonds_pct = Column(JSONB, nullable=True) + target_cash_pct = Column(JSONB, nullable=True) + target_other_pct = Column(JSONB, nullable=True) + metadata_ = Column("metadata", JSONB, nullable=True) + + class NewLLMConfig(BaseModel, TimestampMixin): """ New LLM configuration table that combines model settings with prompt configuration. diff --git a/financegpt_backend/app/parsers/__init__.py b/financegpt_backend/app/parsers/__init__.py index d3a9d28..f83b077 100644 --- a/financegpt_backend/app/parsers/__init__.py +++ b/financegpt_backend/app/parsers/__init__.py @@ -11,9 +11,9 @@ InvestmentTransaction, TransactionType, ) -from app.parsers.chase_parser import ChaseBankParser, ChaseCreditParser, ChaseParser -from app.parsers.discover_parser import DiscoverParser -from app.parsers.fidelity_parser import FidelityParser +# from app.parsers.chase_parser import ChaseBankParser, ChaseCreditParser, ChaseParser +# from app.parsers.discover_parser import DiscoverParser +# from app.parsers.fidelity_parser import FidelityParser from app.parsers.ofx_parser import OFXParser from app.parsers.parser_factory import ParserFactory @@ -27,11 +27,11 @@ "TransactionType", "AccountType", # Parsers - "ChaseParser", - "ChaseBankParser", - "ChaseCreditParser", - "FidelityParser", - "DiscoverParser", + # "ChaseParser", + # "ChaseBankParser", + # "ChaseCreditParser", + # "FidelityParser", + # "DiscoverParser", "OFXParser", # Factory "ParserFactory", diff --git a/financegpt_backend/app/parsers/base_financial_parser.py b/financegpt_backend/app/parsers/base_financial_parser.py index 46d652b..3fb862b 100644 --- a/financegpt_backend/app/parsers/base_financial_parser.py +++ b/financegpt_backend/app/parsers/base_financial_parser.py @@ -189,7 +189,12 @@ def __init__(self, institution_name: str): @abstractmethod async def parse_file( - self, file_content: bytes, filename: str + self, + file_content: bytes, + filename: str, + session=None, + user_id: str | None = None, + search_space_id: int | None = None, ) -> dict[str, Any]: """ Parse a financial statement file. @@ -197,6 +202,9 @@ async def parse_file( Args: file_content: Raw file bytes filename: Original filename (used to detect format) + session: Optional database session for LLM access + user_id: Optional user ID for user-specific LLM config + search_space_id: Optional search space ID for LLM config Returns: Dictionary containing: diff --git a/financegpt_backend/app/parsers/chase_parser.py b/financegpt_backend/app/parsers/chase_parser.py deleted file mode 100644 index 78ac167..0000000 --- a/financegpt_backend/app/parsers/chase_parser.py +++ /dev/null @@ -1,273 +0,0 @@ -""" -Chase Bank and Chase Credit Card CSV parser. - -Chase provides different CSV formats for: -- Checking/Savings accounts -- Credit cards -""" - -import csv -import io -import logging -from datetime import datetime -from typing import Any - -from app.parsers.base_financial_parser import ( - AccountBalance, - AccountType, - BankTransaction, - BaseFinancialParser, - TransactionType, -) - -logger = logging.getLogger(__name__) - - -class ChaseParser(BaseFinancialParser): - """Parser for Chase bank statements (checking/savings and credit cards).""" - - def __init__(self): - """Initialize Chase parser.""" - super().__init__("Chase") - - async def parse_file( - self, file_content: bytes, filename: str - ) -> dict[str, Any]: - """ - Parse Chase CSV file. - - Chase CSV formats: - - Checking/Savings: - Details,Posting Date,Description,Amount,Type,Balance,Check or Slip # - - Credit Card: - Transaction Date,Post Date,Description,Category,Type,Amount,Memo - - OR newer format: - Details,Posting Date,Description,Amount,Balance,Check or Slip # - - Args: - file_content: CSV file bytes - filename: Original filename - - Returns: - Parsed transactions and metadata - """ - try: - # Decode CSV - text_content = file_content.decode("utf-8-sig") # Handle BOM - csv_reader = csv.DictReader(io.StringIO(text_content)) - - # Detect format based on headers - fieldnames = csv_reader.fieldnames - if not fieldnames: - msg = "Empty CSV file" - raise ValueError(msg) - - is_credit_card = "Transaction Date" in fieldnames - is_checking = "Posting Date" in fieldnames and "Balance" in fieldnames - - transactions = [] - balances = [] - - if is_credit_card: - transactions = await self._parse_credit_card(csv_reader) - elif is_checking: - transactions, balances = await self._parse_checking(csv_reader) - else: - msg = f"Unknown Chase CSV format. Headers: {fieldnames}" - raise ValueError(msg) - - return { - "transactions": transactions, - "holdings": [], - "balances": balances, - "metadata": { - "institution": self.institution_name, - "format": "credit_card" if is_credit_card else "checking", - "filename": filename, - "parsed_at": datetime.now().isoformat(), - "transaction_count": len(transactions), - }, - } - - except Exception as e: - logger.error(f"Error parsing Chase CSV: {e}", exc_info=True) - raise - - async def _parse_checking( - self, csv_reader: csv.DictReader - ) -> tuple[list[BankTransaction], list[AccountBalance]]: - """ - Parse Chase checking/savings CSV. - - Format: Details,Posting Date,Description,Amount,Type,Balance,Check or Slip # - OR: Details,Posting Date,Description,Amount,Balance,Check or Slip # - """ - transactions = [] - balances = [] - - for row in csv_reader: - try: - # Skip header rows or empty rows - if not row.get("Description") or row.get("Description") == "Description": - continue - - # Parse date - date_str = row.get("Posting Date", "") - if not date_str: - continue - date = self._parse_date(date_str) - - # Parse amount - amount_str = row.get("Amount", "0") - amount = self._parse_amount(amount_str) - - # Description - description = row.get("Description", "").strip() - - # Type from Chase (DEBIT, CREDIT, CHECK, ACH_DEBIT, etc.) - chase_type = row.get("Type", "").upper() - - # Determine transaction type - if chase_type in ["DEBIT", "ACH_DEBIT", "ATM", "DSLIP"]: - trans_type = TransactionType.DEBIT - elif chase_type in ["CREDIT", "ACH_CREDIT", "DEP"]: - trans_type = TransactionType.CREDIT - elif chase_type == "CHECK": - trans_type = TransactionType.DEBIT - else: - trans_type = self._determine_transaction_type(amount, description) - - # Balance - balance_str = row.get("Balance", "") - balance = self._parse_amount(balance_str) if balance_str else None - - # Check number - check_number = row.get("Check or Slip #", "").strip() or None - - # Details/memo - details = row.get("Details", "").strip() or None - - transaction = BankTransaction( - date=date, - description=description, - amount=amount, - transaction_type=trans_type, - balance=balance, - check_number=check_number, - memo=details, - raw_data=dict(row), - ) - - transactions.append(transaction) - - # Add balance snapshot if available - if balance is not None: - account_balance = AccountBalance( - date=date, - account_type=AccountType.CHECKING, - account_name="Chase Checking", - balance=balance, - institution=self.institution_name, - raw_data=dict(row), - ) - balances.append(account_balance) - - except Exception as e: - logger.warning(f"Error parsing Chase checking row: {e}, row: {row}") - continue - - return transactions, balances - - async def _parse_credit_card( - self, csv_reader: csv.DictReader - ) -> list[BankTransaction]: - """ - Parse Chase credit card CSV. - - Format: Transaction Date,Post Date,Description,Category,Type,Amount,Memo - """ - transactions = [] - - for row in csv_reader: - try: - # Skip header or empty rows - if not row.get("Description") or row.get("Description") == "Description": - continue - - # Parse date (use Transaction Date, fall back to Post Date) - date_str = row.get("Transaction Date") or row.get("Post Date", "") - if not date_str: - continue - date = self._parse_date(date_str) - - # Parse amount (negative for charges, positive for payments/credits) - amount_str = row.get("Amount", "0") - amount = self._parse_amount(amount_str) - - # Description - description = row.get("Description", "").strip() - - # Category (Chase provides this) - category = row.get("Category", "").strip() or None - - # Type from Chase (Sale, Return, Payment, etc.) - chase_type = row.get("Type", "").upper() - - # Determine transaction type - if chase_type == "PAYMENT": - trans_type = TransactionType.PAYMENT - elif chase_type in ["SALE", "PURCHASE"]: - trans_type = TransactionType.PURCHASE - elif chase_type == "RETURN": - trans_type = TransactionType.CREDIT - elif chase_type == "FEE": - trans_type = TransactionType.FEE - else: - # For credit cards, negative amounts are charges - trans_type = ( - TransactionType.PURCHASE - if amount < 0 - else TransactionType.PAYMENT - ) - - # Memo - memo = row.get("Memo", "").strip() or None - - transaction = BankTransaction( - date=date, - description=description, - amount=amount, - transaction_type=trans_type, - category=category, - memo=memo, - raw_data=dict(row), - ) - - transactions.append(transaction) - - except Exception as e: - logger.warning(f"Error parsing Chase credit card row: {e}, row: {row}") - continue - - return transactions - - -class ChaseCreditParser(ChaseParser): - """Specialized parser for Chase Credit Cards.""" - - def __init__(self): - """Initialize Chase Credit parser.""" - super().__init__() - self.institution_name = "Chase Credit Card" - - -class ChaseBankParser(ChaseParser): - """Specialized parser for Chase Bank accounts.""" - - def __init__(self): - """Initialize Chase Bank parser.""" - super().__init__() - self.institution_name = "Chase Bank" diff --git a/financegpt_backend/app/parsers/discover_parser.py b/financegpt_backend/app/parsers/discover_parser.py deleted file mode 100644 index c175be3..0000000 --- a/financegpt_backend/app/parsers/discover_parser.py +++ /dev/null @@ -1,130 +0,0 @@ -""" -Discover Credit Card CSV parser. -""" - -import csv -import io -import logging -from datetime import datetime -from typing import Any - -from app.parsers.base_financial_parser import ( - BankTransaction, - BaseFinancialParser, - TransactionType, -) - -logger = logging.getLogger(__name__) - - -class DiscoverParser(BaseFinancialParser): - """Parser for Discover credit card statements.""" - - def __init__(self): - """Initialize Discover parser.""" - super().__init__("Discover") - - async def parse_file( - self, file_content: bytes, filename: str - ) -> dict[str, Any]: - """ - Parse Discover CSV file. - - Discover CSV format: - Trans. Date,Post Date,Description,Amount,Category - - Args: - file_content: CSV file bytes - filename: Original filename - - Returns: - Parsed transactions and metadata - """ - try: - # Decode CSV - text_content = file_content.decode("utf-8-sig") # Handle BOM - csv_reader = csv.DictReader(io.StringIO(text_content)) - - fieldnames = csv_reader.fieldnames - if not fieldnames: - msg = "Empty CSV file" - raise ValueError(msg) - - transactions = await self._parse_transactions(csv_reader) - - return { - "transactions": transactions, - "holdings": [], - "balances": [], - "metadata": { - "institution": self.institution_name, - "format": "credit_card", - "filename": filename, - "parsed_at": datetime.now().isoformat(), - "transaction_count": len(transactions), - }, - } - - except Exception as e: - logger.error(f"Error parsing Discover CSV: {e}", exc_info=True) - raise - - async def _parse_transactions( - self, csv_reader: csv.DictReader - ) -> list[BankTransaction]: - """ - Parse Discover transaction CSV. - - Format: Trans. Date,Post Date,Description,Amount,Category - """ - transactions = [] - - for row in csv_reader: - try: - # Skip header or empty rows - if not row.get("Description") or row.get("Description") == "Description": - continue - - # Parse date (prefer Trans. Date, fall back to Post Date) - date_str = row.get("Trans. Date") or row.get("Post Date", "") - if not date_str: - continue - date = self._parse_date(date_str) - - # Parse amount (negative for purchases, positive for payments/credits) - amount_str = row.get("Amount", "0") - amount = self._parse_amount(amount_str) - - # Description - description = row.get("Description", "").strip() - - # Category (Discover provides this) - category = row.get("Category", "").strip() or None - - # Determine transaction type - # Negative amounts are purchases, positive are payments/credits - if amount < 0: - trans_type = TransactionType.PURCHASE - elif "payment" in description.lower(): - trans_type = TransactionType.PAYMENT - elif "cashback" in description.lower() or "reward" in description.lower(): - trans_type = TransactionType.CREDIT - else: - trans_type = TransactionType.CREDIT - - transaction = BankTransaction( - date=date, - description=description, - amount=amount, - transaction_type=trans_type, - category=category, - raw_data=dict(row), - ) - - transactions.append(transaction) - - except Exception as e: - logger.warning(f"Error parsing Discover row: {e}, row: {row}") - continue - - return transactions diff --git a/financegpt_backend/app/parsers/fidelity_parser.py b/financegpt_backend/app/parsers/fidelity_parser.py deleted file mode 100644 index f913dc4..0000000 --- a/financegpt_backend/app/parsers/fidelity_parser.py +++ /dev/null @@ -1,324 +0,0 @@ -""" -Fidelity Investments CSV parser. - -Fidelity provides CSV exports for: -- Portfolio positions (holdings) -- Transaction history -- Account balances -""" - -import csv -import io -import logging -from datetime import datetime -from decimal import Decimal -from typing import Any - -from app.parsers.base_financial_parser import ( - AccountType, - BaseFinancialParser, - InvestmentHolding, - InvestmentTransaction, - TransactionType, -) - -logger = logging.getLogger(__name__) - - -class FidelityParser(BaseFinancialParser): - """Parser for Fidelity investment statements.""" - - def __init__(self): - """Initialize Fidelity parser.""" - super().__init__("Fidelity") - - async def parse_file( - self, file_content: bytes, filename: str - ) -> dict[str, Any]: - """ - Parse Fidelity CSV file. - - Fidelity CSV formats: - - Positions/Holdings: - Account Number,Account Name,Symbol,Description,Quantity,Last Price,Current Value, - Cost Basis Total,Gain/Loss Dollar,Gain/Loss Percent,... - - Transactions: - Run Date,Action,Symbol,Security Description,Security Type,Quantity,Price, - Commission,Fees,Accrued Interest,Amount,Settlement Date - - OR: - Date,Transaction Description,Amount,Balance - - Args: - file_content: CSV file bytes - filename: Original filename - - Returns: - Parsed holdings, transactions, and metadata - """ - try: - # Decode CSV - text_content = file_content.decode("utf-8-sig") # Handle BOM - csv_reader = csv.DictReader(io.StringIO(text_content)) - - fieldnames = csv_reader.fieldnames - if not fieldnames: - msg = "Empty CSV file" - raise ValueError(msg) - - # Detect format - is_positions = "Symbol" in fieldnames and "Quantity" in fieldnames - is_transactions = ( - "Action" in fieldnames or "Transaction Description" in fieldnames - ) - - holdings = [] - transactions = [] - balances = [] - - if is_positions: - holdings = await self._parse_positions(csv_reader) - elif is_transactions: - transactions = await self._parse_transactions(csv_reader) - else: - msg = f"Unknown Fidelity CSV format. Headers: {fieldnames}" - raise ValueError(msg) - - return { - "transactions": transactions, - "holdings": holdings, - "balances": balances, - "metadata": { - "institution": self.institution_name, - "format": "positions" if is_positions else "transactions", - "document_subtype": "investment_holdings" if is_positions else "investment_transactions", - "filename": filename, - "parsed_at": datetime.now().isoformat(), - "holding_count": len(holdings), - "transaction_count": len(transactions), - }, - } - - except Exception as e: - logger.error(f"Error parsing Fidelity CSV: {e}", exc_info=True) - raise - - async def _parse_positions( - self, csv_reader: csv.DictReader - ) -> list[InvestmentHolding]: - """ - Parse Fidelity positions/holdings CSV. - - Format includes: Symbol, Description, Quantity, Last Price, Current Value, - Cost Basis Total, Gain/Loss Dollar, Gain/Loss Percent - """ - holdings = [] - - for row in csv_reader: - try: - # Skip empty or header rows - symbol = row.get("Symbol", "").strip() - if not symbol or symbol == "Symbol": - continue - - # Parse basic info - description = row.get("Description", "").strip() - quantity = self._parse_amount(row.get("Quantity", "0")) - price = self._parse_amount(row.get("Last Price", "0")) - value = self._parse_amount(row.get("Current Value", "0")) - - # Parse cost basis and gains - cost_basis_str = row.get("Cost Basis Total", "") - cost_basis = ( - self._parse_amount(cost_basis_str) if cost_basis_str else None - ) - - # Try both "Gain/Loss Dollar" and "Total Gain/Loss Dollar" - gain_loss_str = row.get("Total Gain/Loss Dollar", "") or row.get("Gain/Loss Dollar", "") - gain_loss = self._parse_amount(gain_loss_str) if gain_loss_str else None - - # Try both "Gain/Loss Percent" and "Total Gain/Loss Percent" - gain_loss_pct_str = row.get("Total Gain/Loss Percent", "") or row.get("Gain/Loss Percent", "") - gain_loss_percent = None - if gain_loss_pct_str: - # Remove % sign and parse - pct_cleaned = gain_loss_pct_str.replace("%", "").strip() - if pct_cleaned: - gain_loss_percent = self._parse_amount(pct_cleaned) - - # Determine account type from Account Name if available - account_name = row.get("Account Name", "").upper() - account_type = self._determine_account_type(account_name) - - # Determine asset type - asset_type = row.get("Security Type", "").lower() or None - if not asset_type: - # Guess from symbol - if len(symbol) <= 5: - asset_type = "stock" - else: - asset_type = "mutual_fund" - - holding = InvestmentHolding( - symbol=symbol, - description=description, - quantity=quantity, - price=price, - value=value, - cost_basis=cost_basis, - gain_loss=gain_loss, - gain_loss_percent=gain_loss_percent, - account_type=account_type, - asset_type=asset_type, - raw_data=dict(row), - ) - - holdings.append(holding) - - except Exception as e: - logger.warning(f"Error parsing Fidelity position row: {e}, row: {row}") - continue - - return holdings - - async def _parse_transactions( - self, csv_reader: csv.DictReader - ) -> list[InvestmentTransaction]: - """ - Parse Fidelity transaction history CSV. - - Format: Run Date, Action, Symbol, Security Description, Quantity, Price, Amount - """ - transactions = [] - - for row in csv_reader: - try: - # Check if it's detailed transaction format or simple format - if "Action" in row: - transaction = await self._parse_detailed_transaction(row) - else: - # Simple transaction format (Date, Description, Amount) - transaction = await self._parse_simple_transaction(row) - - if transaction: - transactions.append(transaction) - - except Exception as e: - logger.warning(f"Error parsing Fidelity transaction row: {e}, row: {row}") - continue - - return transactions - - async def _parse_detailed_transaction( - self, row: dict - ) -> InvestmentTransaction | None: - """Parse detailed Fidelity transaction with Action, Symbol, etc.""" - # Parse date - date_str = row.get("Run Date") or row.get("Date", "") - if not date_str: - return None - date = self._parse_date(date_str) - - # Action (BUY, SELL, DIVIDEND, etc.) - action = row.get("Action", "").upper() - symbol = row.get("Symbol", "").strip() - description = row.get("Security Description", "").strip() - - # Quantity and price - quantity = self._parse_amount(row.get("Quantity", "0")) - price = self._parse_amount(row.get("Price", "0")) - amount = self._parse_amount(row.get("Amount", "0")) - - # Fees - commission = self._parse_amount(row.get("Commission", "0")) - fees_field = self._parse_amount(row.get("Fees", "0")) - total_fees = commission + fees_field - - # Map Fidelity action to TransactionType - trans_type = self._map_fidelity_action(action) - - return InvestmentTransaction( - date=date, - symbol=symbol, - description=description, - transaction_type=trans_type, - quantity=quantity, - price=price, - amount=amount, - fees=total_fees, - raw_data=dict(row), - ) - - async def _parse_simple_transaction(self, row: dict) -> InvestmentTransaction | None: - """Parse simple Fidelity transaction (Date, Description, Amount).""" - date_str = row.get("Date", "") - if not date_str: - return None - date = self._parse_date(date_str) - - description = row.get("Transaction Description", "").strip() - amount = self._parse_amount(row.get("Amount", "0")) - - # Try to extract symbol from description - symbol = "" - desc_upper = description.upper() - # Common patterns: "YOU BOUGHT AAPL" or "DIVIDEND MSFT" - words = desc_upper.split() - for word in words: - if len(word) <= 5 and word.isalpha(): - symbol = word - break - - # Determine type from description - trans_type = self._determine_transaction_type(amount, description) - - # Estimate quantity if possible (amount / price, but we don't have price) - quantity = Decimal("0") - if "bought" in desc_upper or "sold" in desc_upper: - quantity = Decimal("1") # Placeholder - - return InvestmentTransaction( - date=date, - symbol=symbol, - description=description, - transaction_type=trans_type, - quantity=quantity, - price=Decimal("0"), - amount=amount, - raw_data=dict(row), - ) - - def _map_fidelity_action(self, action: str) -> TransactionType: - """Map Fidelity action code to TransactionType.""" - action_map = { - "BUY": TransactionType.BUY, - "BOUGHT": TransactionType.BUY, - "SELL": TransactionType.SELL, - "SOLD": TransactionType.SELL, - "DIVIDEND": TransactionType.DIVIDEND, - "DIV": TransactionType.DIVIDEND, - "REINVEST": TransactionType.REINVEST, - "REINVESTMENT": TransactionType.REINVEST, - "INTEREST": TransactionType.INTEREST, - "FEE": TransactionType.FEE, - "SPLIT": TransactionType.SPLIT, - } - - return action_map.get(action, TransactionType.BUY) - - def _determine_account_type(self, account_name: str) -> AccountType: - """Determine account type from Fidelity account name.""" - name_upper = account_name.upper() - - if "ROTH" in name_upper: - return AccountType.ROTH_IRA - if "IRA" in name_upper and "ROTH" not in name_upper: - return AccountType.TRADITIONAL_IRA - if "401" in name_upper or "401K" in name_upper: - return AccountType.K401 - if "BROKERAGE" in name_upper or "INDIVIDUAL" in name_upper: - return AccountType.BROKERAGE - - return AccountType.BROKERAGE # Default diff --git a/financegpt_backend/app/parsers/llm_csv_parser.py b/financegpt_backend/app/parsers/llm_csv_parser.py new file mode 100644 index 0000000..e0a116c --- /dev/null +++ b/financegpt_backend/app/parsers/llm_csv_parser.py @@ -0,0 +1,532 @@ +""" +LLM-based CSV parser for unknown formats. +Uses LLM to intelligently extract holdings from any CSV structure. +""" + +import csv +import io +import json +import logging +from datetime import datetime +from decimal import Decimal +from typing import Any + +from app.parsers.base_financial_parser import ( + BaseFinancialParser, + BankTransaction, + InvestmentHolding, + TransactionType, +) + +logger = logging.getLogger(__name__) + + +class LLMCSVParser(BaseFinancialParser): + """Parser that uses LLM to understand and extract data from any CSV format. + + Privacy-first approach: Sends only sanitized samples to LLM for schema detection, + then applies schema locally to extract actual data without exposing it to LLM. + """ + + def __init__(self): + """Initialize LLM CSV parser.""" + super().__init__("LLM CSV Parser") + + async def parse_file( + self, + file_content: bytes, + filename: str, + session=None, + user_id: str | None = None, + search_space_id: int | None = None, + ) -> dict[str, Any]: + """ + Parse any CSV using LLM to understand the structure. + + Args: + file_content: CSV file bytes + filename: Original filename + session: Optional database session for LLM access + user_id: Optional user ID for user-specific LLM config + search_space_id: Optional search space ID for LLM config + + Returns: + Parsed holdings and metadata + """ + try: + # Decode CSV + text_content = file_content.decode("utf-8-sig") + csv_reader = csv.DictReader(io.StringIO(text_content)) + + # Get headers and first few rows for LLM analysis + rows = list(csv_reader) + if not rows: + raise ValueError("CSV file is empty") + + headers = list(rows[0].keys()) + + # Use LLM to extract data + holdings, transactions = await self._extract_data_with_llm( + headers, rows, filename, session, user_id, search_space_id + ) + + return { + "transactions": transactions, + "holdings": holdings, + "balances": [], + "metadata": { + "institution": "Unknown", + "format": "llm_extracted", + "document_subtype": "investment_holdings" if holdings else "bank_transactions", + "filename": filename, + "parsed_at": datetime.now().isoformat(), + "holding_count": len(holdings), + "transaction_count": len(transactions), + }, + } + + except Exception as e: + logger.error("Error in LLM CSV parsing: %s", e, exc_info=True) + raise + + async def _extract_data_with_llm( + self, + headers: list[str], + rows: list[dict], + filename: str, + session=None, + user_id: str | None = None, + search_space_id: int | None = None, + ) -> tuple[list[InvestmentHolding], list[BankTransaction]]: + """ + Use LLM to intelligently extract holdings OR transactions from CSV. + + Args: + headers: CSV column headers + rows: All CSV rows as dicts + filename: Original filename for context + session: Optional database session for LLM access + user_id: Optional user ID for user-specific LLM config + search_space_id: Optional search space ID for LLM config + + Returns: + Tuple of (holdings list, transactions list) + """ + # Try to get user's configured LLM + llm = None + if session and user_id and search_space_id: + try: + from app.services.llm_service import get_user_long_context_llm + + llm = await get_user_long_context_llm(session, user_id, search_space_id) + if llm: + logger.info("Using user's configured LLM for CSV parsing") + except Exception as e: + logger.warning( + "Failed to get user LLM config, will use fallback parsing: %s", e + ) + + # If no LLM available, use fallback + if not llm: + logger.info( + "No LLM available for CSV parsing, using fallback heuristic approach" + ) + return await self._fallback_basic_parse(rows) + + # PRIVACY-FIRST: Only send sanitized samples to LLM for schema detection + # Real data is extracted locally without exposing it to the LLM + num_rows = len(rows) + num_samples = min(3, num_rows) + + # Sanitize sample rows - replace real values with type indicators + sanitized_samples = [ + {col: self._sanitize_value(row[col]) for col in row} + for row in rows[:num_samples] + ] + + sample_data = json.dumps(sanitized_samples, indent=2, default=str) + logger.info(f"Sending {num_samples} sanitized samples to LLM for schema detection (protecting {num_rows} rows of real data)") + + prompt = f"""You are analyzing a financial CSV file structure. Your job is to determine the file type and create a SCHEMA MAPPING, NOT extract actual data. + +PRIVACY NOTE: You are receiving sanitized sample data. Real values are masked with type indicators like [DECIMAL], [TEXT], [DATE:format], [SYMBOL]. + +CSV Headers: {headers} + +Sanitized Sample Rows (real data is protected): +{sample_data} + +STEP 1: Determine file type +- If headers contain "Transaction Type", "Bought", "Sold", "Payment", "Purchase", or transaction descriptions → This is a TRANSACTION HISTORY file +- If headers contain "Current Value", "Market Value", "Shares", "Position" without transaction types → This is a HOLDINGS/POSITIONS file + +STEP 2: Create a SCHEMA MAPPING (not data extraction) + +For TRANSACTION HISTORY files, provide mapping for: +- date: {{"column": "", "format": ""}} +- description: {{"column": ""}} +- amount: {{"column": "", "sign_convention": "negative_for_debits|positive_for_debits"}} +- transaction_type: {{"column": "", "default": "DEBIT"}} +- category: {{"column": ""}} +- merchant: {{"column": ""}} + +For HOLDINGS/POSITIONS files, provide mapping for: +- symbol: {{"column": ""}} +- quantity: {{"column": ""}} +- price: {{"column": ""}} +- market_value: {{"column": ""}} +- cost_basis: {{"column": "", "calculation": ""}} +- gain_loss: {{"column": ""}} + +Return ONLY the schema mapping as JSON: + +Example for holdings: +{{"file_type": "holdings", "schema": {{ + "symbol": {{"column": "Symbol"}}, + "quantity": {{"column": "Quantity"}}, + "price": {{"column": "Last Price"}}, + "market_value": {{"column": "Current Value"}}, + "cost_basis": {{"calculation": "market_value - gain_loss", "uses_columns": ["Current Value", "Total Gain/Loss Dollar"]}} +}}}} + +Example for transactions: +{{"file_type": "transactions", "schema": {{ + "date": {{"column": "Transaction Date", "format": "MM/DD/YYYY"}}, + "description": {{"column": "Description"}}, + "amount": {{"column": "Amount", "sign_convention": "negative_for_debits"}}, + "transaction_type": {{"default": "DEBIT"}} +}}}}""" + + try: + # Call LLM + response = await llm.ainvoke(prompt) + + # Extract JSON from response + response_text = response.content if hasattr(response, 'content') else str(response) + + # Find JSON object in response + start_idx = response_text.find('{') + end_idx = response_text.rfind('}') + 1 + + if start_idx == -1 or end_idx == 0: + logger.error("LLM response doesn't contain JSON object: %s", response_text) + raise ValueError("LLM failed to return valid JSON") + + json_str = response_text[start_idx:end_idx] + parsed_response = json.loads(json_str) + + file_type = parsed_response.get("file_type", "unknown") + schema = parsed_response.get("schema", {}) + + logger.info(f"LLM detected file type: {file_type}, applying schema to {num_rows} rows locally (privacy-preserving)") + + # Apply schema locally to ALL rows without sending to LLM + if file_type == "holdings": + holdings = self._apply_holdings_schema_locally(schema, rows) + return holdings, [] + elif file_type == "transactions": + transactions = self._apply_transactions_schema_locally(schema, rows) + return [], transactions + else: + logger.warning(f"Unknown file type '{file_type}', attempting fallback") + return await self._fallback_basic_parse(rows) + + except Exception as e: + logger.error("LLM extraction failed: %s", e, exc_info=True) + # Fallback: try basic parsing + return await self._fallback_basic_parse(rows) + + def _sanitize_value(self, value: Any) -> str: + """ + Sanitize a CSV value to protect privacy while preserving type information. + + Args: + value: Raw value from CSV + + Returns: + Sanitized type indicator (e.g., "[DECIMAL]", "[TEXT]", "[DATE:MM/DD/YYYY]") + """ + if value is None or value == "": + return "[EMPTY]" + + value_str = str(value).strip() + + # Check for decimal/currency + clean_val = value_str.replace("$", "").replace(",", "").replace("(", "-").replace(")", "").strip() + try: + float(clean_val) + if "." in clean_val: + return "[DECIMAL]" + return "[INTEGER]" + except ValueError: + pass + + # Check for date patterns + import re + date_patterns = [ + (r'^\d{1,2}/\d{1,2}/\d{4}$', '[DATE:MM/DD/YYYY]'), + (r'^\d{4}-\d{2}-\d{2}$', '[DATE:YYYY-MM-DD]'), + (r'^\d{1,2}-\d{1,2}-\d{4}$', '[DATE:MM-DD-YYYY]'), + (r'^[A-Za-z]{3} \d{1,2}, \d{4}$', '[DATE:Mon DD, YYYY]'), + ] + for pattern, indicator in date_patterns: + if re.match(pattern, value_str): + return indicator + + # Check for stock symbols (2-5 uppercase letters) + if re.match(r'^[A-Z]{2,5}$', value_str): + return "[SYMBOL]" + + # Default to text + return "[TEXT]" + + def _apply_holdings_schema_locally(self, schema: dict, rows: list[dict]) -> list[InvestmentHolding]: + """ + Apply LLM-generated schema to extract holdings data locally without LLM. + + Args: + schema: Schema mapping from LLM + rows: All CSV rows + + Returns: + List of InvestmentHolding objects extracted locally + """ + holdings = [] + + for row in rows: + try: + # Extract symbol + symbol_col = schema.get("symbol", {}).get("column") + if not symbol_col or symbol_col not in row: + continue + symbol = str(row[symbol_col]).strip().upper() + if not symbol or symbol in ["", "N/A", "Total", "TOTAL"]: + continue + + # Extract quantity + quantity_col = schema.get("quantity", {}).get("column") + if not quantity_col or quantity_col not in row: + continue + quantity_str = str(row[quantity_col]).replace(",", "").strip() + if not quantity_str: + continue + quantity = Decimal(quantity_str) + if quantity <= 0: + continue + + # Extract optional fields + price = None + price_col = schema.get("price", {}).get("column") + if price_col and price_col in row and row[price_col]: + price = Decimal(str(row[price_col]).replace("$", "").replace(",", "").strip()) + + market_value = None + mv_col = schema.get("market_value", {}).get("column") + if mv_col and mv_col in row and row[mv_col]: + market_value = Decimal(str(row[mv_col]).replace("$", "").replace(",", "").strip()) + + # Handle cost_basis - might need calculation + cost_basis = None + cb_config = schema.get("cost_basis", {}) + if "column" in cb_config: + cb_col = cb_config["column"] + if cb_col in row and row[cb_col]: + cost_basis = Decimal(str(row[cb_col]).replace("$", "").replace(",", "").strip()) + elif "calculation" in cb_config: + # Handle calculated cost_basis (e.g., market_value - gain_loss) + calc = cb_config["calculation"] + if "market_value - gain_loss" in calc: + uses_cols = cb_config.get("uses_columns", []) + if len(uses_cols) >= 2 and all(c in row for c in uses_cols): + mv = Decimal(str(row[uses_cols[0]]).replace("$", "").replace(",", "").strip()) + gl_str = str(row[uses_cols[1]]).replace("$", "").replace(",", "").replace("(", "-").replace(")", "").strip() + gl = Decimal(gl_str) + cost_basis = mv - gl + + holding = InvestmentHolding( + symbol=symbol, + description=row.get(schema.get("description", {}).get("column", ""), ""), + quantity=quantity, + price=price, + value=market_value, + cost_basis=cost_basis, + gain_loss=None, + gain_loss_percent=None, + account_type=None, + asset_type="stock" if len(symbol) <= 5 else "mutual_fund", + raw_data=dict(row), + ) + + holdings.append(holding) + + except (ValueError, TypeError, KeyError) as e: + logger.debug(f"Skipping row in holdings extraction: {e}") + continue + + logger.info(f"Extracted {len(holdings)} holdings locally (privacy-preserving)") + return holdings + + def _apply_transactions_schema_locally(self, schema: dict, rows: list[dict]) -> list[BankTransaction]: + """ + Apply LLM-generated schema to extract transactions data locally without LLM. + + Args: + schema: Schema mapping from LLM + rows: All CSV rows + + Returns: + List of BankTransaction objects extracted locally + """ + transactions = [] + + for row in rows: + try: + # Extract date + date_col = schema.get("date", {}).get("column") + if not date_col or date_col not in row or not row[date_col]: + continue + + date_str = str(row[date_col]).strip() + date_format = schema.get("date", {}).get("format", "MM/DD/YYYY") + + # Parse date based on detected format + from datetime import datetime + format_map = { + "MM/DD/YYYY": "%m/%d/%Y", + "YYYY-MM-DD": "%Y-%m-%d", + "MM-DD-YYYY": "%m-%d-%Y", + "DD/MM/YYYY": "%d/%m/%Y", + } + python_format = format_map.get(date_format, "%m/%d/%Y") + date = datetime.strptime(date_str, python_format).date() + + # Extract description + desc_col = schema.get("description", {}).get("column") + if not desc_col or desc_col not in row: + continue + description = str(row[desc_col]).strip() + if not description: + continue + + # Extract amount + amount_col = schema.get("amount", {}).get("column") + if not amount_col or amount_col not in row or not row[amount_col]: + continue + + amount_str = str(row[amount_col]).replace("$", "").replace(",", "").replace("(", "-").replace(")", "").strip() + amount = Decimal(amount_str) + + # Determine transaction type + sign_convention = schema.get("amount", {}).get("sign_convention", "negative_for_debits") + txn_type_col = schema.get("transaction_type", {}).get("column") + + if txn_type_col and txn_type_col in row and row[txn_type_col]: + txn_type_str = str(row[txn_type_col]).upper().strip() + try: + txn_type = TransactionType[txn_type_str] + except KeyError: + # Try to map common transaction types + type_mapping = { + "SALE": TransactionType.PURCHASE, + "ACH": TransactionType.TRANSFER, + "WITHDRAWAL": TransactionType.WITHDRAWAL, + "DEPOSIT": TransactionType.DEPOSIT, + } + txn_type = type_mapping.get(txn_type_str, + TransactionType.CREDIT if amount > 0 else TransactionType.DEBIT) + else: + # Use default or infer from amount + txn_type = TransactionType.CREDIT if amount > 0 else TransactionType.DEBIT + + # Extract optional fields + category = None + category_col = schema.get("category", {}).get("column") + if category_col and category_col in row: + category = str(row[category_col]).strip() or None + + merchant = None + merchant_col = schema.get("merchant", {}).get("column") + if merchant_col and merchant_col in row: + merchant = str(row[merchant_col]).strip() or None + + transaction = BankTransaction( + date=date, + description=description, + amount=amount, + transaction_type=txn_type, + balance=None, + category=category, + merchant=merchant, + account_last_4=None, + check_number=None, + memo=None, + raw_data=dict(row), + ) + + transactions.append(transaction) + + except (ValueError, TypeError, KeyError) as e: + logger.debug(f"Skipping row in transaction extraction: {e}") + continue + + logger.info(f"Extracted {len(transactions)} transactions locally (privacy-preserving)") + return transactions + + async def _fallback_basic_parse(self, rows: list[dict]) -> tuple[list[InvestmentHolding], list[BankTransaction]]: + """ + Fallback parser if LLM fails - uses simple heuristics. + + Args: + rows: CSV rows as dicts + + Returns: + Tuple of (holdings list, transactions list) + """ + logger.info("Using fallback basic parsing") + holdings = [] + + for row in rows: + # Try to find symbol column (common names) + symbol = None + for key in ["Symbol", "Ticker", "Stock Symbol", "Security"]: + value = row.get(key) + if value: + symbol = value.strip().upper() + break + + if not symbol: + continue + + # Try to find quantity + quantity = None + for key in ["Quantity", "Shares", "Qty", "Units"]: + value = row.get(key) + if value: + try: + quantity = Decimal(str(value).replace(",", "")) + break + except (ValueError, TypeError): + continue + + if not quantity or quantity <= 0: + continue + + # Basic holding with just symbol and quantity + holding = InvestmentHolding( + symbol=symbol, + description="", + quantity=quantity, + price=None, + value=None, + cost_basis=None, + gain_loss=None, + gain_loss_percent=None, + account_type=None, + asset_type="stock", + raw_data=dict(row), + ) + + holdings.append(holding) + + logger.info("Fallback parser extracted %d holdings", len(holdings)) + return holdings, [] # Fallback only handles holdings, not transactions diff --git a/financegpt_backend/app/parsers/ofx_parser.py b/financegpt_backend/app/parsers/ofx_parser.py index 78df9d5..7bb7719 100644 --- a/financegpt_backend/app/parsers/ofx_parser.py +++ b/financegpt_backend/app/parsers/ofx_parser.py @@ -31,7 +31,12 @@ def __init__(self): super().__init__("OFX") async def parse_file( - self, file_content: bytes, filename: str + self, + file_content: bytes, + filename: str, + session=None, + user_id: str | None = None, + search_space_id: int | None = None, ) -> dict[str, Any]: """ Parse OFX/QFX file. diff --git a/financegpt_backend/app/parsers/parser_factory.py b/financegpt_backend/app/parsers/parser_factory.py index 15c9844..f7c8adb 100644 --- a/financegpt_backend/app/parsers/parser_factory.py +++ b/financegpt_backend/app/parsers/parser_factory.py @@ -7,9 +7,7 @@ from app.db import SearchSourceConnectorType from app.parsers.base_financial_parser import BaseFinancialParser -from app.parsers.chase_parser import ChaseBankParser, ChaseCreditParser, ChaseParser -from app.parsers.discover_parser import DiscoverParser -from app.parsers.fidelity_parser import FidelityParser +from app.parsers.llm_csv_parser import LLMCSVParser from app.parsers.ofx_parser import OFXParser from app.parsers.pdf_statement_parser import PDFStatementParser @@ -36,14 +34,10 @@ def get_parser( ValueError: If connector type not supported """ parser_map = { - SearchSourceConnectorType.CHASE_BANK: ChaseBankParser(), - SearchSourceConnectorType.CHASE_CREDIT: ChaseCreditParser(), - SearchSourceConnectorType.FIDELITY_INVESTMENTS: FidelityParser(), - SearchSourceConnectorType.DISCOVER_CREDIT: DiscoverParser(), SearchSourceConnectorType.OFX_UPLOAD: OFXParser(), - # Generic parsers - SearchSourceConnectorType.GENERIC_BANK_CSV: ChaseParser(), # Use Chase format as default - SearchSourceConnectorType.GENERIC_INVESTMENT_CSV: FidelityParser(), # Use Fidelity format + # LLM parser handles ALL CSV formats (holdings and transactions) + SearchSourceConnectorType.GENERIC_INVESTMENT_CSV: LLMCSVParser(), + SearchSourceConnectorType.GENERIC_BANK_CSV: LLMCSVParser(), } parser = parser_map.get(connector_type) @@ -90,30 +84,9 @@ def detect_format(file_content: bytes, filename: str) -> SearchSourceConnectorTy if filename_lower.endswith((".ofx", ".qfx")): return SearchSourceConnectorType.OFX_UPLOAD - # For CSV, try to detect from content + # Use LLM parser for ALL CSV files (privacy-first universal parser) if filename_lower.endswith(".csv"): - try: - # Decode first few lines - text_preview = file_content[:1000].decode("utf-8-sig") - - # Check for institution-specific headers - if "Chase" in text_preview or "Transaction Date,Post Date" in text_preview: - if "Transaction Date" in text_preview: - return SearchSourceConnectorType.CHASE_CREDIT - return SearchSourceConnectorType.CHASE_BANK - - if "Fidelity" in text_preview or "Symbol,Description,Quantity" in text_preview: - return SearchSourceConnectorType.FIDELITY_INVESTMENTS - - if "Discover" in text_preview or "Trans. Date,Post Date" in text_preview: - return SearchSourceConnectorType.DISCOVER_CREDIT - - # Default to generic bank CSV - return SearchSourceConnectorType.GENERIC_BANK_CSV - - except Exception as e: - logger.warning(f"Error detecting CSV format: {e}") - return SearchSourceConnectorType.GENERIC_BANK_CSV + return SearchSourceConnectorType.GENERIC_INVESTMENT_CSV # Not a recognized financial file format return None diff --git a/financegpt_backend/app/parsers/pdf_statement_parser.py b/financegpt_backend/app/parsers/pdf_statement_parser.py index 63894de..5e0d528 100644 --- a/financegpt_backend/app/parsers/pdf_statement_parser.py +++ b/financegpt_backend/app/parsers/pdf_statement_parser.py @@ -26,7 +26,12 @@ def __init__(self): super().__init__("PDF Statement") async def parse_file( - self, file_content: bytes, filename: str + self, + file_content: bytes, + filename: str, + session=None, + user_id: str | None = None, + search_space_id: int | None = None, ) -> dict[str, Any]: """ Parse PDF bank statement. diff --git a/financegpt_backend/app/routes/documents_routes.py b/financegpt_backend/app/routes/documents_routes.py index 4d17df6..9bae45b 100644 --- a/financegpt_backend/app/routes/documents_routes.py +++ b/financegpt_backend/app/routes/documents_routes.py @@ -796,6 +796,28 @@ async def delete_document( "You don't have permission to delete documents in this search space", ) + # If this is a financial document with associated investment holdings, delete them too + try: + if document.document_metadata and document.document_metadata.get("is_financial_document"): + from app.db import InvestmentAccount + + # Find accounts created from this file (match by filename in account_name) + filename = document.document_metadata.get("FILE_NAME") + if filename: + # Delete accounts with matching filename (cascade will delete holdings) + result = await session.execute( + select(InvestmentAccount).where( + InvestmentAccount.user_id == user.id, + InvestmentAccount.account_name == filename + ) + ) + accounts = result.scalars().all() + for account in accounts: + await session.delete(account) + except Exception as e: + # Log but don't fail the whole delete if investment cleanup fails + print(f"Warning: Failed to clean up investment holdings: {e}") + await session.delete(document) await session.commit() return {"message": "Document deleted successfully"} diff --git a/financegpt_backend/app/schemas/investments.py b/financegpt_backend/app/schemas/investments.py new file mode 100644 index 0000000..9df4711 --- /dev/null +++ b/financegpt_backend/app/schemas/investments.py @@ -0,0 +1,305 @@ +"""Pydantic schemas for investment holdings and accounts.""" +from datetime import date, datetime +from decimal import Decimal +from enum import Enum +from typing import Any +from uuid import UUID + +from pydantic import BaseModel, ConfigDict, Field + + +class AssetType(str, Enum): + """Investment asset types.""" + STOCK = "stock" + BOND = "bond" + MUTUAL_FUND = "mutual_fund" + ETF = "etf" + CRYPTO = "crypto" + OPTION = "option" + OTHER = "other" + + +class AccountTaxType(str, Enum): + """Account tax treatment types.""" + TAXABLE = "taxable" + TAX_DEFERRED = "tax_deferred" # Traditional IRA, 401k + TAX_FREE = "tax_free" # Roth IRA, Roth 401k + + +class SourceType(str, Enum): + """Data source types.""" + PLAID = "plaid" + DOCUMENT = "document" + MANUAL = "manual" + + +class TransactionType(str, Enum): + """Transaction types.""" + BUY = "buy" + SELL = "sell" + DIVIDEND = "dividend" + INTEREST = "interest" + FEE = "fee" + TRANSFER_IN = "transfer_in" + TRANSFER_OUT = "transfer_out" + + +# ============================================================================ +# Investment Account Schemas +# ============================================================================ + +class InvestmentAccountBase(BaseModel): + """Base schema for investment accounts.""" + account_name: str + account_type: str # brokerage, IRA, 401k, etc. + account_tax_type: AccountTaxType + account_number: str | None = None + institution: str | None = None + total_value: Decimal | None = None + cash_balance: Decimal | None = None + + +class InvestmentAccountCreate(InvestmentAccountBase): + """Schema for creating an investment account.""" + source_type: SourceType = SourceType.MANUAL + source_id: str | None = None + metadata: dict[str, Any] | None = None + + +class InvestmentAccountUpdate(BaseModel): + """Schema for updating an investment account.""" + account_name: str | None = None + total_value: Decimal | None = None + cash_balance: Decimal | None = None + metadata: dict[str, Any] | None = None + + +class InvestmentAccount(InvestmentAccountBase): + """Full investment account schema.""" + model_config = ConfigDict(from_attributes=True) + + id: UUID + user_id: UUID + source_type: SourceType + source_id: str | None = None + last_synced_at: datetime | None = None + metadata: dict[str, Any] | None = None + created_at: datetime + updated_at: datetime + + +# ============================================================================ +# Investment Holding Schemas +# ============================================================================ + +class InvestmentHoldingBase(BaseModel): + """Base schema for investment holdings.""" + symbol: str = Field(..., max_length=20) + description: str | None = None + quantity: Decimal + cost_basis: Decimal + average_cost_basis: Decimal | None = None + + +class InvestmentHoldingCreate(InvestmentHoldingBase): + """Schema for creating an investment holding (minimal user input).""" + account_id: UUID + + +class InvestmentHoldingEnriched(InvestmentHoldingBase): + """Enriched holding data from Yahoo Finance or LLM.""" + current_price: Decimal | None = None + market_value: Decimal | None = None + previous_close: Decimal | None = None + day_change: Decimal | None = None + day_change_pct: Decimal | None = None + unrealized_gain_loss: Decimal | None = None + unrealized_gain_loss_pct: Decimal | None = None + + asset_type: AssetType | None = None + sector: str | None = None + industry: str | None = None + geographic_region: str | None = None + + price_as_of_timestamp: datetime | None = None + extraction_confidence: Decimal | None = Field(None, ge=0, le=1) + + +class InvestmentHolding(InvestmentHoldingEnriched): + """Full investment holding schema.""" + model_config = ConfigDict(from_attributes=True) + + id: UUID + account_id: UUID + acquisition_date: date | None = None + holding_period_days: int | None = None + is_long_term: bool = False + metadata: dict[str, Any] | None = None + created_at: datetime + updated_at: datetime + + +# ============================================================================ +# Transaction Schemas +# ============================================================================ + +class InvestmentTransactionBase(BaseModel): + """Base schema for investment transactions.""" + symbol: str = Field(..., max_length=20) + transaction_type: TransactionType + transaction_date: date + quantity: Decimal + price: Decimal + amount: Decimal + fees: Decimal | None = None + description: str | None = None + + +class InvestmentTransactionCreate(InvestmentTransactionBase): + """Schema for creating a transaction.""" + account_id: UUID + metadata: dict[str, Any] | None = None + + +class InvestmentTransaction(InvestmentTransactionBase): + """Full transaction schema.""" + model_config = ConfigDict(from_attributes=True) + + id: UUID + account_id: UUID + metadata: dict[str, Any] | None = None + created_at: datetime + + +# ============================================================================ +# Portfolio Allocation Schemas +# ============================================================================ + +class PortfolioAllocationTargets(BaseModel): + """User's target portfolio allocation.""" + model_config = ConfigDict(from_attributes=True) + + id: UUID + user_id: UUID + target_stocks_pct: Decimal = Decimal("60.0") + target_bonds_pct: Decimal = Decimal("30.0") + target_cash_pct: Decimal = Decimal("10.0") + target_international_pct: Decimal | None = None + created_at: datetime + updated_at: datetime + + +class PortfolioAllocationTargetsUpdate(BaseModel): + """Schema for updating allocation targets.""" + target_stocks_pct: Decimal | None = None + target_bonds_pct: Decimal | None = None + target_cash_pct: Decimal | None = None + target_international_pct: Decimal | None = None + + +# ============================================================================ +# CSV Import Schemas +# ============================================================================ + +class FidelityHoldingCSVRow(BaseModel): + """Schema for parsing Fidelity CSV row.""" + account_number: str = Field(..., alias="Account Number") + account_name: str | None = Field(None, alias="Account Name") + symbol: str = Field(..., alias="Symbol") + description: str = Field(..., alias="Description") + quantity: Decimal = Field(..., alias="Quantity") + last_price: Decimal | None = Field(None, alias="Last Price") + price: Decimal | None = Field(None, alias="Price") # Alternative field name + last_price_change: Decimal | None = Field(None, alias="Last Price Change") + current_value: Decimal | None = Field(None, alias="Current Value") + market_value: Decimal | None = Field(None, alias="Market Value") # Alternative field name + todays_gain_loss_dollar: Decimal | None = Field(None, alias="Today's Gain/Loss Dollar") + day_change: Decimal | None = Field(None, alias="Day Change") # Alternative field name + todays_gain_loss_percent: Decimal | None = Field(None, alias="Today's Gain/Loss Percent") + day_change_percent: Decimal | None = Field(None, alias="Day Change %") # Alternative field name + total_gain_loss_dollar: Decimal | None = Field(None, alias="Total Gain/Loss Dollar") + cost_basis_gain_loss: Decimal | None = Field(None, alias="Cost Basis Gain/Loss") # Alternative field name + total_gain_loss_percent: Decimal | None = Field(None, alias="Total Gain/Loss Percent") + gain_loss_percent: Decimal | None = Field(None, alias="Gain/Loss %") # Alternative field name + percent_of_account: Decimal | None = Field(None, alias="Percent Of Account") + cost_basis_total: Decimal | None = Field(None, alias="Cost Basis Total") + average_cost_basis: Decimal | None = Field(None, alias="Average Cost Basis") + type_field: str | None = Field(None, alias="Type") + price_as_of_date: str | None = Field(None, alias="Price As Of Date") + + model_config = ConfigDict(populate_by_name=True) + + +class BulkHoldingsUpload(BaseModel): + """Schema for bulk uploading holdings.""" + account_id: UUID | None = None + account_name: str | None = None + account_type: str = "brokerage" + account_tax_type: AccountTaxType = AccountTaxType.TAXABLE + institution: str = "Fidelity" + holdings: list[InvestmentHoldingCreate] + + +# ============================================================================ +# Analysis Response Schemas +# ============================================================================ + +class HoldingPerformance(BaseModel): + """Performance data for a single holding.""" + symbol: str + quantity: Decimal + market_value: Decimal + day_change: Decimal + day_change_pct: Decimal + unrealized_gain_loss: Decimal + unrealized_gain_loss_pct: Decimal + + +class PortfolioPerformanceResponse(BaseModel): + """Response schema for portfolio performance.""" + total_value: Decimal + total_day_change: Decimal + total_day_change_pct: Decimal + total_unrealized_gain_loss: Decimal + top_gainers: list[HoldingPerformance] + top_losers: list[HoldingPerformance] + as_of_timestamp: datetime + + +class AllocationBreakdown(BaseModel): + """Allocation breakdown by category.""" + category: str + value: Decimal + percentage: Decimal + target_percentage: Decimal | None = None + variance: Decimal | None = None + + +class PortfolioAllocationResponse(BaseModel): + """Response schema for portfolio allocation analysis.""" + total_value: Decimal + by_asset_type: list[AllocationBreakdown] + by_sector: list[AllocationBreakdown] + rebalancing_needed: bool + rebalancing_suggestions: list[str] | None = None + + +class TaxHarvestingOpportunity(BaseModel): + """A single tax loss harvesting opportunity.""" + symbol: str + quantity: Decimal + cost_basis: Decimal + current_value: Decimal + unrealized_loss: Decimal + holding_period_days: int + is_long_term: bool + potential_tax_savings: Decimal + wash_sale_risk: bool + + +class TaxHarvestingResponse(BaseModel): + """Response schema for tax harvesting opportunities.""" + opportunities: list[TaxHarvestingOpportunity] + total_potential_loss: Decimal + total_potential_tax_savings: Decimal + warnings: list[str] diff --git a/financegpt_backend/app/services/investment_csv_parser.py b/financegpt_backend/app/services/investment_csv_parser.py new file mode 100644 index 0000000..88424a0 --- /dev/null +++ b/financegpt_backend/app/services/investment_csv_parser.py @@ -0,0 +1,300 @@ +"""CSV parser for Fidelity investment holdings.""" +import csv +import io +from decimal import Decimal +from typing import Any +from uuid import UUID + +from app.schemas.investments import ( + AccountTaxType, + FidelityHoldingCSVRow, + InvestmentAccountCreate, + InvestmentHoldingCreate, + SourceType, +) +from app.services.yahoo_finance_enrichment import YahooFinanceEnrichmentService + + +class FidelityCSVParser: + """Parser for Fidelity investment account CSV files.""" + + @staticmethod + async def parse_csv( + csv_content: str | bytes, + account_tax_type: AccountTaxType = AccountTaxType.TAXABLE, + ) -> dict[str, Any]: + """ + Parse Fidelity CSV and return account + holdings data. + + Args: + csv_content: CSV file content as string or bytes + account_tax_type: Tax type for the account (must be specified by user) + + Returns: + Dict with account_data and holdings + """ + # Handle bytes + if isinstance(csv_content, bytes): + csv_content = csv_content.decode('utf-8') + + # Parse CSV + reader = csv.DictReader(io.StringIO(csv_content)) + rows = list(reader) + + if not rows: + raise ValueError("CSV file is empty") + + # Extract account info from first row + first_row = rows[0] + account_number = first_row.get("Account Number", "").strip() + account_name = first_row.get("Account Name", "").strip() or f"Account {account_number}" + + # Detect account type from holdings + account_type = FidelityCSVParser._detect_account_type(rows) + + # Parse each holding + holdings_data = [] + for row_data in rows: + try: + # Validate row using Pydantic + row = FidelityHoldingCSVRow(**row_data) + + # Get market value (try both field names) + market_value = row.market_value if row.market_value is not None else row.current_value + + # Calculate cost basis if not directly provided + cost_basis = row.cost_basis_total + if cost_basis is None and row.cost_basis_gain_loss is not None and market_value is not None: + # cost_basis = market_value - gain_loss + cost_basis = market_value - row.cost_basis_gain_loss + + # Calculate average cost basis if not provided + average_cost_basis = row.average_cost_basis + if average_cost_basis is None and cost_basis is not None and row.quantity and row.quantity > 0: + average_cost_basis = cost_basis / row.quantity + + # Extract basic holding data + holding_data = { + "symbol": row.symbol.strip(), + "description": row.description.strip(), + "quantity": row.quantity, + "cost_basis": cost_basis, + "average_cost_basis": average_cost_basis, + } + holdings_data.append(holding_data) + + except Exception as e: + # Skip invalid rows but continue parsing + print(f"Warning: Skipped row due to error: {e}") + continue + + if not holdings_data: + raise ValueError("No valid holdings found in CSV") + + return { + "account_data": { + "account_number": account_number, + "account_name": account_name, + "account_type": account_type, + "account_tax_type": account_tax_type, + "institution": "Fidelity", + }, + "holdings": holdings_data, + } + + @staticmethod + def _detect_account_type(rows: list[dict[str, Any]]) -> str: + """Detect account type based on holdings.""" + # Look for common patterns + has_mutual_funds = any("fund" in row.get("Type", "").lower() for row in rows) + has_stocks = any("stock" in row.get("Type", "").lower() for row in rows) + + if has_mutual_funds and not has_stocks: + return "401k" # Likely retirement account + return "brokerage" + + @staticmethod + async def parse_and_enrich_csv( + csv_content: str | bytes, + account_tax_type: AccountTaxType = AccountTaxType.TAXABLE, + enrich_with_yahoo: bool = True, + ) -> dict[str, Any]: + """ + Parse CSV and optionally enrich with Yahoo Finance data. + + Args: + csv_content: CSV file content + account_tax_type: Tax type for the account + enrich_with_yahoo: Whether to enrich with real-time Yahoo Finance data + + Returns: + Dict with account and enriched holdings + """ + # Parse CSV + parsed_data = await FidelityCSVParser.parse_csv(csv_content, account_tax_type) + + # Enrich if requested + if enrich_with_yahoo: + holdings_to_enrich = [ + ( + h["symbol"], + h["quantity"], + h["cost_basis"], + h.get("average_cost_basis"), + ) + for h in parsed_data["holdings"] + ] + + enriched_holdings = await YahooFinanceEnrichmentService.batch_enrich_holdings( + holdings_to_enrich + ) + + # Convert to dict format + parsed_data["enriched_holdings"] = [ + h.model_dump() for h in enriched_holdings + ] + + return parsed_data + + +class GenericHoldingsParser: + """Parser for generic holdings CSV (Symbol, Quantity, Cost Basis, Average Cost).""" + + @staticmethod + def _fuzzy_match_column(columns: list[str], possible_names: list[str]) -> str | None: + """ + Find a column name using fuzzy matching. + + Tries in order: + 1. Exact match (case-insensitive) + 2. Substring match (case-insensitive) + + Args: + columns: List of actual column names from CSV + possible_names: List of possible column names to search for + + Returns: + Matched column name or None if not found + """ + columns_lower = {col.lower(): col for col in columns} + + # Try exact match first + for name in possible_names: + if name.lower() in columns_lower: + return columns_lower[name.lower()] + + # Try substring match + for name in possible_names: + name_lower = name.lower() + for col_lower, col_original in columns_lower.items(): + if name_lower in col_lower: + return col_original + + return None + + @staticmethod + async def parse_minimal_csv( + csv_content: str | bytes, + account_id: UUID, + ) -> list[InvestmentHoldingCreate]: + """ + Parse minimal CSV with just Symbol, Quantity, Cost Basis, Average Cost. + Uses fuzzy column matching to handle different CSV formats. + + Args: + csv_content: CSV with columns for symbol, quantity, cost basis, etc. + account_id: UUID of the account these holdings belong to + + Returns: + List of InvestmentHoldingCreate objects ready to be enriched + """ + # Handle bytes + if isinstance(csv_content, bytes): + csv_content = csv_content.decode('utf-8') + + # Parse CSV + reader = csv.DictReader(io.StringIO(csv_content)) + rows = list(reader) + + if not rows: + raise ValueError("CSV file is empty") + + # Get column names and find matches + columns = list(rows[0].keys()) + + # Define possible column names for each field (in order of preference) + symbol_col = GenericHoldingsParser._fuzzy_match_column( + columns, ["Symbol", "Ticker", "Stock Symbol"] + ) + quantity_col = GenericHoldingsParser._fuzzy_match_column( + columns, ["Quantity", "Shares", "Qty", "Units"] + ) + value_col = GenericHoldingsParser._fuzzy_match_column( + columns, ["Current Value", "Market Value", "Value", "Total Value"] + ) + cost_basis_col = GenericHoldingsParser._fuzzy_match_column( + columns, ["Cost Basis Total", "Cost Basis", "Total Cost", "Original Cost"] + ) + gain_loss_col = GenericHoldingsParser._fuzzy_match_column( + columns, ["Cost Basis Gain/Loss", "Gain/Loss", "Total Gain/Loss Dollar", "Total Gain/Loss"] + ) + avg_cost_col = GenericHoldingsParser._fuzzy_match_column( + columns, ["Average Cost Basis", "Average Cost", "Avg Cost", "Cost Per Share"] + ) + + # Require symbol and quantity at minimum + if not symbol_col: + raise ValueError("Could not find Symbol column in CSV") + if not quantity_col: + raise ValueError("Could not find Quantity column in CSV") + + holdings = [] + for row in rows: + try: + symbol = row[symbol_col].strip().upper() + if not symbol: + continue + + quantity = Decimal(row[quantity_col] or 0) + if quantity <= 0: + continue + + # Get market value if available + market_value = None + if value_col and row.get(value_col): + market_value = Decimal(row[value_col]) + + # Calculate cost basis + cost_basis = None + if cost_basis_col and row.get(cost_basis_col): + cost_basis = Decimal(row[cost_basis_col]) + elif gain_loss_col and value_col and row.get(gain_loss_col) and row.get(value_col): + # Calculate: cost_basis = market_value - gain_loss + market_value = Decimal(row[value_col]) + gain_loss = Decimal(row[gain_loss_col]) + cost_basis = market_value - gain_loss + + # Get average cost basis + average_cost = None + if avg_cost_col and row.get(avg_cost_col): + average_cost = Decimal(row[avg_cost_col]) + elif cost_basis and quantity > 0: + average_cost = cost_basis / quantity + + holding = InvestmentHoldingCreate( + account_id=account_id, + symbol=symbol, + quantity=quantity, + cost_basis=cost_basis if cost_basis and cost_basis > 0 else None, + average_cost_basis=average_cost if average_cost and average_cost > 0 else None, + ) + holdings.append(holding) + + except (ValueError, Decimal.InvalidOperation) as e: + print(f"Warning: Skipped row due to error: {e}") + continue + + if not holdings: + raise ValueError("No valid holdings found in CSV") + + return holdings diff --git a/financegpt_backend/app/services/llm_document_extractor.py b/financegpt_backend/app/services/llm_document_extractor.py new file mode 100644 index 0000000..daabcc5 --- /dev/null +++ b/financegpt_backend/app/services/llm_document_extractor.py @@ -0,0 +1,395 @@ +""" +LLM-based document extraction service for investment holdings. + +Uses instructor + LLM to extract structured data from any document format: +- PDFs (brokerage statements, account summaries) +- Excel/CSV files with non-standard formats +- Images of statements +- Word documents + +This provides a flexible, format-agnostic approach that handles variations +across different financial institutions. +""" + +import logging +from typing import Optional, BinaryIO +from io import BytesIO + +import instructor +from openai import AsyncOpenAI +from pydantic import BaseModel, Field + +from app.config import config +from app.schemas.investments import ( + InvestmentHoldingEnriched, + AccountTaxType, +) + +logger = logging.getLogger(__name__) + + +class ExtractedHoldingsData(BaseModel): + """Structured data extracted from a document.""" + + account_name: Optional[str] = Field( + None, + description="Account name or description (e.g., 'Individual Brokerage', 'Roth IRA')" + ) + account_number: Optional[str] = Field( + None, + description="Account number if visible in document" + ) + account_type: Optional[str] = Field( + None, + description="Type of account: brokerage, 401k, ira_traditional, ira_roth, etc." + ) + institution: Optional[str] = Field( + None, + description="Financial institution name (e.g., 'Fidelity', 'Vanguard', 'Charles Schwab')" + ) + statement_date: Optional[str] = Field( + None, + description="Statement or report date in ISO format (YYYY-MM-DD)" + ) + holdings: list[dict] = Field( + default_factory=list, + description="List of investment holdings with symbol, quantity, cost_basis, etc." + ) + + class Config: + json_schema_extra = { + "example": { + "account_name": "Individual Brokerage", + "account_number": "Z12345678", + "account_type": "brokerage", + "institution": "Fidelity", + "statement_date": "2026-01-29", + "holdings": [ + { + "symbol": "AAPL", + "description": "Apple Inc", + "quantity": 100, + "cost_basis": 15000.00, + "average_cost_basis": 150.00, + } + ] + } + } + + +class LLMDocumentExtractor: + """Extract investment holdings from documents using LLM.""" + + def __init__(self): + """Initialize the LLM client with instructor.""" + # Use the configured LLM from FinanceGPT settings + self.client = instructor.from_openai( + AsyncOpenAI( + api_key=config.OPENAI_API_KEY, + base_url=config.OPENAI_BASE_URL if hasattr(config, 'OPENAI_BASE_URL') else None, + ) + ) + + async def extract_from_text( + self, + text_content: str, + account_tax_type: Optional[AccountTaxType] = None, + ) -> dict: + """ + Extract holdings from plain text content. + + Args: + text_content: Text extracted from document + account_tax_type: Optional tax type hint + + Returns: + Dictionary with account_data and holdings + """ + system_prompt = """You are a financial document parser. Extract investment holdings data from the provided text. + +Extract the following information: +1. Account details (name, number, type, institution) +2. For each holding: + - Symbol (ticker symbol) + - Description (company/fund name) + - Quantity (number of shares/units) + - Cost Basis (total purchase cost) + - Average Cost Basis (cost per share) + - Current Price (if available) + - Market Value (if available) + +Important: +- Only extract data you can see in the document +- Use null/None for missing fields +- Preserve exact numeric values +- Extract ALL holdings you find +- Account type should be one of: brokerage, 401k, ira_traditional, ira_roth, 529, hsa +""" + + user_prompt = f"""Extract investment holdings from this document: + +{text_content} + +{f"Note: This is a {account_tax_type} account." if account_tax_type else ""} +""" + + try: + # Use instructor to get structured output + extracted_data = await self.client.chat.completions.create( + model="gpt-4o", # Use GPT-4 for better extraction accuracy + response_model=ExtractedHoldingsData, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + temperature=0, # Deterministic extraction + ) + + # Convert to our format + account_data = { + "account_name": extracted_data.account_name or "Uploaded Account", + "account_number": extracted_data.account_number or f"DOC-{hash(text_content[:100]) % 100000:05d}", + "account_type": extracted_data.account_type or "brokerage", + "account_tax_type": account_tax_type or "taxable", + "institution": extracted_data.institution or "Unknown", + } + + # Validate and structure holdings + holdings = [] + for holding in extracted_data.holdings: + # Ensure required fields + if not holding.get("symbol"): + logger.warning(f"Skipping holding without symbol: {holding}") + continue + + # Build holding dict with required fields + holding_dict = { + "symbol": holding["symbol"], + "description": holding.get("description"), + "quantity": float(holding.get("quantity", 0)), + "cost_basis": float(holding.get("cost_basis", 0)), + "average_cost_basis": float(holding.get("average_cost_basis", 0)), + } + + # Add optional fields if present + if "current_price" in holding and holding["current_price"]: + holding_dict["current_price"] = float(holding["current_price"]) + if "market_value" in holding and holding["market_value"]: + holding_dict["market_value"] = float(holding["market_value"]) + + holdings.append(holding_dict) + + return { + "account_data": account_data, + "holdings": holdings, + "extraction_method": "llm", + } + + except Exception as e: + logger.error(f"LLM extraction failed: {e}") + raise ValueError(f"Failed to extract holdings from document: {str(e)}") + + async def extract_from_pdf( + self, + pdf_file: BinaryIO, + account_tax_type: Optional[AccountTaxType] = None, + ) -> dict: + """ + Extract holdings from PDF file. + + Args: + pdf_file: PDF file binary stream + account_tax_type: Optional tax type hint + + Returns: + Dictionary with account_data and holdings + """ + try: + import pdfplumber + except ImportError: + raise ImportError("pdfplumber is required for PDF parsing. Install with: pip install pdfplumber") + + # Extract text and tables from PDF + text_content = [] + + with pdfplumber.open(pdf_file) as pdf: + # Extract text from all pages + for page in pdf.pages: + text_content.append(page.extract_text()) + + # Also extract tables which might contain holdings + tables = page.extract_tables() + for table in tables: + # Convert table to readable text format + if table: + table_text = "\n".join(["\t".join(str(cell) for cell in row if cell) for row in table]) + text_content.append(table_text) + + # Combine all extracted content + full_text = "\n\n".join(filter(None, text_content)) + + if not full_text.strip(): + raise ValueError("No text could be extracted from the PDF") + + # Use LLM to extract structured data + return await self.extract_from_text(full_text, account_tax_type) + + async def extract_from_excel( + self, + excel_file: BinaryIO, + account_tax_type: Optional[AccountTaxType] = None, + ) -> dict: + """ + Extract holdings from Excel file. + + Args: + excel_file: Excel file binary stream + account_tax_type: Optional tax type hint + + Returns: + Dictionary with account_data and holdings + """ + try: + import openpyxl + except ImportError: + raise ImportError("openpyxl is required for Excel parsing. Install with: pip install openpyxl") + + # Load workbook + workbook = openpyxl.load_workbook(excel_file, data_only=True) + + # Extract all sheets as text + text_content = [] + + for sheet_name in workbook.sheetnames: + sheet = workbook[sheet_name] + text_content.append(f"Sheet: {sheet_name}") + + # Convert sheet to text format + for row in sheet.iter_rows(values_only=True): + row_text = "\t".join(str(cell) if cell is not None else "" for cell in row) + if row_text.strip(): + text_content.append(row_text) + + # Combine all content + full_text = "\n".join(text_content) + + if not full_text.strip(): + raise ValueError("No data could be extracted from the Excel file") + + # Use LLM to extract structured data + return await self.extract_from_text(full_text, account_tax_type) + + async def extract_from_image( + self, + image_file: BinaryIO, + account_tax_type: Optional[AccountTaxType] = None, + ) -> dict: + """ + Extract holdings from image file using GPT-4 Vision. + + Args: + image_file: Image file binary stream + account_tax_type: Optional tax type hint + + Returns: + Dictionary with account_data and holdings + """ + import base64 + + # Read image and encode to base64 + image_bytes = image_file.read() + base64_image = base64.b64encode(image_bytes).decode('utf-8') + + # Detect image format + image_format = "jpeg" # default + if image_bytes.startswith(b'\x89PNG'): + image_format = "png" + elif image_bytes.startswith(b'GIF'): + image_format = "gif" + + system_prompt = """You are a financial document parser. Extract investment holdings data from the provided image. + +Extract the following information: +1. Account details (name, number, type, institution) +2. For each holding: + - Symbol (ticker symbol) + - Description (company/fund name) + - Quantity (number of shares/units) + - Cost Basis (total purchase cost) + - Average Cost Basis (cost per share) + - Current Price (if available) + - Market Value (if available) + +Important: +- Only extract data you can see in the image +- Use null/None for missing fields +- Preserve exact numeric values +- Extract ALL holdings you find +- Account type should be one of: brokerage, 401k, ira_traditional, ira_roth, 529, hsa +""" + + user_prompt = f"Extract investment holdings from this statement image." + if account_tax_type: + user_prompt += f"\nNote: This is a {account_tax_type} account." + + try: + # Use GPT-4 Vision to extract data + extracted_data = await self.client.chat.completions.create( + model="gpt-4o", + response_model=ExtractedHoldingsData, + messages=[ + {"role": "system", "content": system_prompt}, + { + "role": "user", + "content": [ + {"type": "text", "text": user_prompt}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/{image_format};base64,{base64_image}" + } + } + ] + }, + ], + temperature=0, + ) + + # Convert to our format (same as extract_from_text) + account_data = { + "account_name": extracted_data.account_name or "Uploaded Account", + "account_number": extracted_data.account_number or f"IMG-{hash(base64_image[:100]) % 100000:05d}", + "account_type": extracted_data.account_type or "brokerage", + "account_tax_type": account_tax_type or "taxable", + "institution": extracted_data.institution or "Unknown", + } + + holdings = [] + for holding in extracted_data.holdings: + if not holding.get("symbol"): + continue + + holding_dict = { + "symbol": holding["symbol"], + "description": holding.get("description"), + "quantity": float(holding.get("quantity", 0)), + "cost_basis": float(holding.get("cost_basis", 0)), + "average_cost_basis": float(holding.get("average_cost_basis", 0)), + } + + if "current_price" in holding and holding["current_price"]: + holding_dict["current_price"] = float(holding["current_price"]) + if "market_value" in holding and holding["market_value"]: + holding_dict["market_value"] = float(holding["market_value"]) + + holdings.append(holding_dict) + + return { + "account_data": account_data, + "holdings": holdings, + "extraction_method": "llm_vision", + } + + except Exception as e: + logger.error(f"Vision extraction failed: {e}") + raise ValueError(f"Failed to extract holdings from image: {str(e)}") diff --git a/financegpt_backend/app/services/llm_service.py b/financegpt_backend/app/services/llm_service.py index 33f073d..733e1de 100644 --- a/financegpt_backend/app/services/llm_service.py +++ b/financegpt_backend/app/services/llm_service.py @@ -351,3 +351,25 @@ async def get_user_long_context_llm( The user_id parameter is ignored as LLM preferences are now per-search-space. """ return await get_document_summary_llm(session, search_space_id) + + +async def get_system_llm() -> ChatLiteLLM: + """ + Get a system LLM for background tasks that don't have a specific user/search space. + Uses the first global LLM config available. + + Returns: + ChatLiteLLM instance configured with system default LLM + """ + # Use the first global LLM config + if not config.GLOBAL_LLM_CONFIGS: + raise RuntimeError("No global LLM configs available for system tasks") + + global_config = config.GLOBAL_LLM_CONFIGS[0] + + return ChatLiteLLM( + model=global_config["model"], + api_key=global_config.get("api_key"), + base_url=global_config.get("api_base"), + temperature=0.1, # Low temperature for more consistent parsing + ) diff --git a/financegpt_backend/app/services/llm_stock_enrichment.py b/financegpt_backend/app/services/llm_stock_enrichment.py new file mode 100644 index 0000000..3a6a151 --- /dev/null +++ b/financegpt_backend/app/services/llm_stock_enrichment.py @@ -0,0 +1,158 @@ +"""LLM-based stock price enrichment service using web search.""" +import logging +import os +import re +import ssl +from decimal import Decimal + +# Monkey-patch SSL to disable certificate verification globally +# This is needed for corporate proxies with self-signed certificates +_original_create_default_context = ssl.create_default_context + +def _create_unverified_context(*args, **kwargs): + context = _original_create_default_context(*args, **kwargs) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + return context + +ssl.create_default_context = _create_unverified_context +ssl._create_default_https_context = _create_unverified_context + +from linkup import LinkupClient + +from app.schemas.investments import AssetType, InvestmentHoldingEnriched + +logger = logging.getLogger(__name__) + + +class LLMStockEnrichmentService: + """Service to enrich investment holdings using Linkup web search.""" + + @staticmethod + async def enrich_holding( + symbol: str, + quantity: Decimal, + cost_basis: Decimal, + average_cost_basis: Decimal | None = None, + ) -> InvestmentHoldingEnriched: + """ + Enrich a holding with current data using Linkup web search. + + Args: + symbol: Stock ticker symbol + quantity: Number of shares + cost_basis: Total cost basis + average_cost_basis: Average cost per share + + Returns: + Enriched holding with current prices and performance + """ + # Ensure all inputs are Decimal type + quantity = Decimal(str(quantity)) + cost_basis = Decimal(str(cost_basis)) + if average_cost_basis is not None: + average_cost_basis = Decimal(str(average_cost_basis)) + + try: + # Get Linkup API key from environment + api_key = os.getenv("LINKUP_API_KEY") + if not api_key: + raise ValueError("LINKUP_API_KEY not set in environment") + + # Initialize Linkup client (SSL verification disabled via monkey-patch above) + client = LinkupClient(api_key=api_key) + + # Search for current stock price - use sourcedAnswer for better extraction + query = f"Current stock price of {symbol} ticker symbol. Provide exact numeric price value." + + # Perform search with sourcedAnswer to get structured response + response = client.search( + query=query, + depth="standard", + output_type="sourcedAnswer" # Get structured answer instead of raw results + ) + + # Extract the answer text + answer_text = "" + if hasattr(response, 'answer'): + answer_text = str(response.answer) + elif hasattr(response, 'text'): + answer_text = str(response.text) + else: + answer_text = str(response) + + logger.info(f"Linkup answer for {symbol}: {answer_text}") + + # Try to extract current price using improved regex + # Look for patterns like "$123.45", "123.45 USD", "price: 123.45", etc. + price_patterns = [ + r'\$(\d+\.?\d*)', # $123.45 + r'(\d+\.\d{2})\s*(?:USD|dollars?)', # 123.45 USD + r'price[:\s]+\$?(\d+\.\d{2})', # price: $123.45 or price 123.45 + r'(\d{1,4}\.\d{2})', # Any number with 2 decimals (last resort) + ] + + current_price = Decimal("0") + for pattern in price_patterns: + matches = re.findall(pattern, answer_text, re.IGNORECASE) + if matches: + # Take the first match and try to validate it's reasonable + price_candidate = Decimal(matches[0]) + # Basic validation: stock prices typically between $0.01 and $10,000 + if Decimal("0.01") <= price_candidate <= Decimal("10000"): + current_price = price_candidate + break + + if current_price == 0: + raise ValueError(f"Could not extract valid price from Linkup answer: {answer_text}") + + # For now, use current price as previous close (no day change data from search) + previous_close = current_price + + # Calculate market value and performance + market_value = quantity * current_price + day_change = current_price - previous_close + day_change_pct = (day_change / previous_close * 100) if previous_close > 0 else Decimal("0") + + unrealized_gain_loss = market_value - cost_basis + unrealized_gain_loss_pct = (unrealized_gain_loss / cost_basis * 100) if cost_basis > 0 else Decimal("0") + + return InvestmentHoldingEnriched( + symbol=symbol, + description=symbol, + quantity=quantity, + cost_basis=cost_basis, + average_cost_basis=average_cost_basis or (cost_basis / quantity if quantity > 0 else Decimal("0")), + current_price=current_price, + market_value=market_value, + day_change=day_change, + day_change_pct=day_change_pct, + unrealized_gain_loss=unrealized_gain_loss, + unrealized_gain_loss_pct=unrealized_gain_loss_pct, + sector=None, + industry=None, + geographic_region="US", + asset_type=AssetType.STOCK, + ) + + except Exception as e: + logger.error(f"Linkup enrichment failed for {symbol}: {e}") + # Return basic data without enrichment + market_value = cost_basis # Use cost basis as fallback + return InvestmentHoldingEnriched( + symbol=symbol, + description=symbol, + quantity=quantity, + cost_basis=cost_basis, + average_cost_basis=average_cost_basis or (cost_basis / quantity if quantity > 0 else Decimal("0")), + current_price=Decimal("0"), + market_value=market_value, + day_change=Decimal("0"), + day_change_pct=Decimal("0"), + unrealized_gain_loss=Decimal("0"), + unrealized_gain_loss_pct=Decimal("0"), + sector=None, + industry=None, + geographic_region="US", + asset_type=AssetType.STOCK, + ) diff --git a/financegpt_backend/app/services/yahoo_finance_enrichment.py b/financegpt_backend/app/services/yahoo_finance_enrichment.py new file mode 100644 index 0000000..24434b1 --- /dev/null +++ b/financegpt_backend/app/services/yahoo_finance_enrichment.py @@ -0,0 +1,204 @@ +"""Yahoo Finance enrichment service for investment holdings.""" +import asyncio +import os +from datetime import datetime +from decimal import Decimal +from typing import Any + +# Disable SSL verification for curl_cffi before importing yfinance +# Set environment variables that curl_cffi checks +os.environ["CURL_CA_BUNDLE"] = "" +os.environ["SSL_CERT_FILE"] = "" +os.environ["REQUESTS_CA_BUNDLE"] = "" + +import yfinance as yf +from app.schemas.investments import ( + AssetType, + InvestmentHoldingEnriched, +) + +# Try to patch yfinance's session to disable SSL verification +try: + from curl_cffi import requests as curl_requests + + # Create a custom session with SSL verification disabled + _original_session_init = curl_requests.Session.__init__ + + def _patched_session_init(self, *args, **kwargs): + _original_session_init(self, *args, **kwargs) + # Disable SSL verification + self.verify = False + + curl_requests.Session.__init__ = _patched_session_init +except Exception: + pass # If patching fails, continue anyway + +yf.set_tz_cache_location("/tmp/yfinance-tz-cache") # Use temp dir for cache + + +class YahooFinanceEnrichmentService: + """Service to enrich investment holdings with Yahoo Finance data.""" + + @staticmethod + def _map_quote_type_to_asset_type(quote_type: str) -> AssetType: + """Map Yahoo Finance quoteType to our AssetType enum.""" + mapping = { + "EQUITY": AssetType.STOCK, + "ETF": AssetType.ETF, + "MUTUALFUND": AssetType.MUTUAL_FUND, + "CRYPTOCURRENCY": AssetType.CRYPTO, + "INDEX": AssetType.OTHER, + } + return mapping.get(quote_type, AssetType.STOCK) + + @staticmethod + async def enrich_holding( + symbol: str, + quantity: Decimal, + cost_basis: Decimal, + average_cost_basis: Decimal | None = None, + ) -> InvestmentHoldingEnriched: + """ + Enrich a holding with real-time data from Yahoo Finance. + + Args: + symbol: Stock ticker symbol + quantity: Number of shares + cost_basis: Total cost basis + average_cost_basis: Average cost per share + + Returns: + Enriched holding with current prices, performance, and metadata + """ + # Run Yahoo Finance API in thread pool (it's blocking) + loop = asyncio.get_event_loop() + # Let yfinance handle the session internally with curl_cffi + ticker = await loop.run_in_executor(None, lambda: yf.Ticker(symbol)) + + try: + # Get current quote data + info: dict[str, Any] = await loop.run_in_executor(None, lambda: ticker.info) + + # Extract price data + current_price = Decimal(str(info.get("currentPrice") or info.get("regularMarketPrice", 0))) + previous_close = Decimal(str(info.get("previousClose", current_price))) + + # Calculate market value and performance + market_value = quantity * current_price + day_change = current_price - previous_close + day_change_pct = (day_change / previous_close * 100) if previous_close > 0 else Decimal("0") + + unrealized_gain_loss = market_value - cost_basis + unrealized_gain_loss_pct = (unrealized_gain_loss / cost_basis * 100) if cost_basis > 0 else Decimal("0") + + # Extract classification data + quote_type = info.get("quoteType", "EQUITY") + asset_type = YahooFinanceEnrichmentService._map_quote_type_to_asset_type(quote_type) + + sector = info.get("sector") + industry = info.get("industry") + + # Infer geographic region from exchange + exchange = info.get("exchange", "") + if exchange in ["NMS", "NYQ", "PCX", "NAS"]: # US exchanges + geographic_region = "US" + elif exchange in ["TOR"]: + geographic_region = "Canada" + elif exchange in ["LON"]: + geographic_region = "UK" + else: + geographic_region = info.get("country", "Unknown") + + return InvestmentHoldingEnriched( + symbol=symbol, + description=info.get("longName") or info.get("shortName"), + quantity=quantity, + cost_basis=cost_basis, + average_cost_basis=average_cost_basis or (cost_basis / quantity if quantity > 0 else Decimal("0")), + current_price=current_price, + market_value=market_value, + previous_close=previous_close, + day_change=day_change, + day_change_pct=day_change_pct, + unrealized_gain_loss=unrealized_gain_loss, + unrealized_gain_loss_pct=unrealized_gain_loss_pct, + asset_type=asset_type, + sector=sector, + industry=industry, + geographic_region=geographic_region, + price_as_of_timestamp=datetime.now(), + extraction_confidence=Decimal("0.95"), # High confidence for API data + ) + + except (KeyError, ValueError, TypeError) as e: + # Fallback with minimal data if API fails + return InvestmentHoldingEnriched( + symbol=symbol, + description=None, + quantity=quantity, + cost_basis=cost_basis, + average_cost_basis=average_cost_basis, + current_price=None, + market_value=None, + extraction_confidence=Decimal("0.0"), # Low confidence when enrichment fails + ) + + @staticmethod + async def batch_enrich_holdings( + holdings: list[tuple[str, Decimal, Decimal, Decimal | None]] + ) -> list[InvestmentHoldingEnriched]: + """ + Enrich multiple holdings in parallel. + + Args: + holdings: List of (symbol, quantity, cost_basis, average_cost_basis) tuples + + Returns: + List of enriched holdings + """ + tasks = [ + YahooFinanceEnrichmentService.enrich_holding(symbol, qty, cost, avg_cost) + for symbol, qty, cost, avg_cost in holdings + ] + return await asyncio.gather(*tasks) + + @staticmethod + async def refresh_holding_prices( + holdings: list[dict[str, Any]] + ) -> list[dict[str, Any]]: + """ + Refresh just the price data for existing holdings. + + Args: + holdings: List of holding dicts with symbol and quantity + + Returns: + Updated holdings with latest prices + """ + updated_holdings = [] + + for holding in holdings: + symbol = holding["symbol"] + quantity = Decimal(str(holding["quantity"])) + + loop = asyncio.get_event_loop() + ticker = await loop.run_in_executor(None, yf.Ticker, symbol) + + try: + info: dict[str, Any] = await loop.run_in_executor(None, lambda: ticker.info) + current_price = Decimal(str(info.get("currentPrice") or info.get("regularMarketPrice", 0))) + previous_close = Decimal(str(info.get("previousClose", current_price))) + + holding["current_price"] = current_price + holding["market_value"] = quantity * current_price + holding["day_change"] = current_price - previous_close + holding["day_change_pct"] = (holding["day_change"] / previous_close * 100) if previous_close > 0 else Decimal("0") + holding["price_as_of_timestamp"] = datetime.now() + + except Exception: + # Keep existing data if refresh fails + pass + + updated_holdings.append(holding) + + return updated_holdings diff --git a/financegpt_backend/app/tasks/document_processors/file_processors.py b/financegpt_backend/app/tasks/document_processors/file_processors.py index 424d06d..f7eecb4 100644 --- a/financegpt_backend/app/tasks/document_processors/file_processors.py +++ b/financegpt_backend/app/tasks/document_processors/file_processors.py @@ -12,7 +12,6 @@ import httpx from fastapi import HTTPException from langchain_core.documents import Document as LangChainDocument -from litellm import atranscription from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession @@ -53,6 +52,88 @@ ) +async def _save_investment_holdings( + session: AsyncSession, + user_id: str, + holdings_data: list, + metadata: dict, + filename: str, +) -> None: + """ + Save investment holdings to structured database table. + + Args: + session: Database session + user_id: User ID + holdings_data: List of holdings from parser + metadata: Metadata from parser (institution, account info, etc.) + filename: Original filename + """ + from uuid import UUID + from app.db import InvestmentAccount, InvestmentHolding + import uuid + + try: + # Create or get investment account + account = InvestmentAccount( + user_id=UUID(user_id), + account_number=metadata.get("account_number", f"FILE-{uuid.uuid4().hex[:8]}"), + account_name=metadata.get("account_name", filename), + account_type=metadata.get("account_type", "brokerage"), + account_tax_type=metadata.get("account_tax_type", "taxable"), + institution=metadata.get("institution", "Unknown"), + total_value=0.0, # Will be calculated from holdings + source_type="document", # Uploaded from file + ) + session.add(account) + await session.flush() # Get account ID + + # Process each holding + total_value = 0.0 + for holding_data in holdings_data: + symbol = holding_data.symbol if hasattr(holding_data, 'symbol') else holding_data.get('symbol', '') + + # Handle None values from CSV parsing + quantity_raw = holding_data.quantity if hasattr(holding_data, 'quantity') else holding_data.get('quantity') + cost_basis_raw = holding_data.cost_basis if hasattr(holding_data, 'cost_basis') else holding_data.get('cost_basis') + current_value_raw = holding_data.value if hasattr(holding_data, 'value') else holding_data.get('value') + + quantity = float(quantity_raw or 0) + cost_basis = float(cost_basis_raw or 0) + current_value = float(current_value_raw or 0) + + if not symbol or quantity == 0: + continue + + # Calculate average cost basis + average_cost_basis = cost_basis / quantity if quantity > 0 and cost_basis > 0 else 0 + + # Save holding with basic data only (no enrichment during upload) + # Prices will be fetched on-demand when user queries their portfolio + holding = InvestmentHolding( + account_id=account.id, + symbol=symbol, + quantity=quantity, + cost_basis=cost_basis, + average_cost_basis=average_cost_basis, + market_value=current_value, # Use current_value from CSV as fallback + ) + + total_value += current_value + session.add(holding) + + # Update account total value + account.total_value = total_value + await session.flush() + + logger.info(f"Saved {len(holdings_data)} investment holdings for user {user_id}") + + except Exception as e: + logger.error(f"Failed to save investment holdings: {e}") + # Don't fail the entire document upload if holdings save fails + # The document will still be searchable, just won't have structured data + + def get_google_drive_unique_identifier( connector: dict | None, filename: str, @@ -819,6 +900,10 @@ async def _process_financial_data( gain_loss = holding.gain_loss if hasattr(holding, 'gain_loss') else holding.get('gain_loss') gain_loss_pct = holding.gain_loss_percent if hasattr(holding, 'gain_loss_percent') else holding.get('gain_loss_percent') + # Handle None values for formatting + value = value if value is not None else 0 + quantity = quantity if quantity is not None else 0 + # Basic info markdown_parts.append(f"- **{symbol}**: {quantity} shares @ ${value:.2f}") @@ -860,8 +945,8 @@ async def _process_financial_data( "holdings": [ { "symbol": h.symbol if hasattr(h, 'symbol') else h.get('symbol', ''), - "quantity": float(h.quantity) if hasattr(h, 'quantity') else float(h.get('quantity', 0)), - "value": float(h.value) if hasattr(h, 'value') else float(h.get('value', 0)), + "quantity": float(h.quantity) if (hasattr(h, 'quantity') and h.quantity is not None) else 0.0, + "value": float(h.value) if (hasattr(h, 'value') and h.value is not None) else 0.0, "cost_basis": float(h.cost_basis) if (hasattr(h, 'cost_basis') and h.cost_basis is not None) else None, "gain_loss": float(h.gain_loss) if (hasattr(h, 'gain_loss') and h.gain_loss is not None) else None, "gain_loss_percent": float(h.gain_loss_percent) if (hasattr(h, 'gain_loss_percent') and h.gain_loss_percent is not None) else None, @@ -895,6 +980,16 @@ async def _process_financial_data( # Create new document with embeddings and chunks for search + # Save investment holdings to structured table if this is an investment file + if holdings: + await _save_investment_holdings( + session=session, + user_id=user_id, + holdings_data=holdings, + metadata=metadata, + filename=filename, + ) + # Get user's LLM for summary generation user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -987,7 +1082,10 @@ async def process_file_in_background( try: parser = ParserFactory.get_parser(detected_format) - financial_data = await parser.parse_file(file_content, filename) + # Pass session context to parser for LLM access + financial_data = await parser.parse_file( + file_content, filename, session, user_id, search_space_id + ) result = await _process_financial_data( session, filename, financial_data, search_space_id, user_id, file_path, task_logger, log_entry @@ -1084,148 +1182,6 @@ async def process_file_in_background( ) return None - # Check if the file is an audio file - elif filename.lower().endswith( - (".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm") - ): - # Update notification: parsing stage (transcription) - if notification: - await ( - NotificationService.document_processing.notify_processing_progress( - session, - notification, - stage="parsing", - stage_message="Transcribing audio", - ) - ) - - await task_logger.log_task_progress( - log_entry, - f"Processing audio file for transcription: {filename}", - {"file_type": "audio", "processing_stage": "starting_transcription"}, - ) - - # Determine STT service type - stt_service_type = ( - "local" - if app_config.STT_SERVICE - and app_config.STT_SERVICE.startswith("local/") - else "external" - ) - - # Check if using local STT service - if stt_service_type == "local": - # Use local Faster-Whisper for transcription - from app.services.stt_service import stt_service - - try: - result = stt_service.transcribe_file(file_path) - transcribed_text = result.get("text", "") - - if not transcribed_text: - raise ValueError("Transcription returned empty text") - - # Add metadata about the transcription - transcribed_text = ( - f"# Transcription of {filename}\n\n{transcribed_text}" - ) - except Exception as e: - raise HTTPException( - status_code=422, - detail=f"Failed to transcribe audio file {filename}: {e!s}", - ) from e - - await task_logger.log_task_progress( - log_entry, - f"Local STT transcription completed: {filename}", - { - "processing_stage": "local_transcription_complete", - "language": result.get("language"), - "confidence": result.get("language_probability"), - "duration": result.get("duration"), - }, - ) - else: - # Use LiteLLM for audio transcription - with open(file_path, "rb") as audio_file: - transcription_kwargs = { - "model": app_config.STT_SERVICE, - "file": audio_file, - "api_key": app_config.STT_SERVICE_API_KEY, - } - if app_config.STT_SERVICE_API_BASE: - transcription_kwargs["api_base"] = ( - app_config.STT_SERVICE_API_BASE - ) - - transcription_response = await atranscription( - **transcription_kwargs - ) - - # Extract the transcribed text - transcribed_text = transcription_response.get("text", "") - - if not transcribed_text: - raise ValueError("Transcription returned empty text") - - # Add metadata about the transcription - transcribed_text = ( - f"# Transcription of {filename}\n\n{transcribed_text}" - ) - - await task_logger.log_task_progress( - log_entry, - f"Transcription completed, creating document: {filename}", - { - "processing_stage": "transcription_complete", - "transcript_length": len(transcribed_text), - }, - ) - - # Update notification: chunking stage - if notification: - await ( - NotificationService.document_processing.notify_processing_progress( - session, notification, stage="chunking" - ) - ) - - # Clean up the temp file - try: - os.unlink(file_path) - except Exception as e: - print("Error deleting temp file", e) - pass - - # Process transcription as markdown document - result = await add_received_markdown_file_document( - session, filename, transcribed_text, search_space_id, user_id, connector - ) - - if connector: - await _update_document_from_connector(result, connector, session) - - if result: - await task_logger.log_task_success( - log_entry, - f"Successfully transcribed and processed audio file: {filename}", - { - "document_id": result.id, - "content_hash": result.content_hash, - "file_type": "audio", - "transcript_length": len(transcribed_text), - "stt_service": stt_service_type, - }, - ) - return result - else: - await task_logger.log_task_success( - log_entry, - f"Audio file transcript already exists (duplicate): {filename}", - {"duplicate_detected": True, "file_type": "audio"}, - ) - return None - else: # Import page limit service from app.services.page_limit_service import ( diff --git a/financegpt_backend/check_db.py b/financegpt_backend/check_db.py new file mode 100644 index 0000000..2034569 --- /dev/null +++ b/financegpt_backend/check_db.py @@ -0,0 +1,44 @@ +import psycopg2 + +# Connect to database +conn = psycopg2.connect( + host="localhost", + port=5432, + database="financegpt", + user="postgres", + password="postgres" +) + +cur = conn.cursor() + +# Count accounts +cur.execute("SELECT COUNT(*) FROM investment_accounts;") +account_count = cur.fetchone()[0] +print(f"Investment Accounts: {account_count}") + +# Count holdings +cur.execute("SELECT COUNT(*) FROM investment_holdings;") +holding_count = cur.fetchone()[0] +print(f"Investment Holdings: {holding_count}") + +# Get total value +cur.execute("SELECT SUM(market_value) FROM investment_holdings;") +total_value = cur.fetchone()[0] or 0 +print(f"Total Market Value: ${total_value:,.2f}") + +# List accounts if any +if account_count > 0: + cur.execute("SELECT account_name, total_value FROM investment_accounts;") + print("\nAccounts:") + for row in cur.fetchall(): + print(f" - {row[0]}: ${row[1]:,.2f}") + +# List holdings if any +if holding_count > 0: + cur.execute("SELECT symbol, quantity, market_value FROM investment_holdings;") + print("\nHoldings:") + for row in cur.fetchall(): + print(f" - {row[0]}: qty={row[1]}, value=${row[2]:,.2f}") + +cur.close() +conn.close() diff --git a/financegpt_backend/financegpt_new_backend.egg-info/PKG-INFO b/financegpt_backend/financegpt_new_backend.egg-info/PKG-INFO index a5212ee..0627bb2 100644 --- a/financegpt_backend/financegpt_new_backend.egg-info/PKG-INFO +++ b/financegpt_backend/financegpt_new_backend.egg-info/PKG-INFO @@ -44,6 +44,10 @@ Requires-Dist: langchain-community>=0.3.31 Requires-Dist: langchain-unstructured>=1.0.0 Requires-Dist: litellm>=1.80.10 Requires-Dist: langchain-litellm>=0.3.5 +Requires-Dist: yfinance>=0.2.48 +Requires-Dist: instructor>=1.7.0 +Requires-Dist: pdfplumber>=0.11.0 +Requires-Dist: openpyxl>=3.1.0 Requires-Dist: ofxparse>=0.21 Requires-Dist: pdfplumber>=0.11.0 Requires-Dist: plaid-python>=28.0.0 diff --git a/financegpt_backend/financegpt_new_backend.egg-info/SOURCES.txt b/financegpt_backend/financegpt_new_backend.egg-info/SOURCES.txt index b74c7ca..58fe279 100644 --- a/financegpt_backend/financegpt_new_backend.egg-info/SOURCES.txt +++ b/financegpt_backend/financegpt_new_backend.egg-info/SOURCES.txt @@ -1,6 +1,7 @@ pyproject.toml alembic/env.py alembic/versions/0_initial_schema.py +alembic/versions/1_add_investment_holdings_tables.py app/__init__.py app/app.py app/celery_app.py @@ -17,6 +18,7 @@ app/agents/new_chat/utils.py app/agents/new_chat/tools/__init__.py app/agents/new_chat/tools/display_image.py app/agents/new_chat/tools/find_subscriptions.py +app/agents/new_chat/tools/investment_holdings.py app/agents/new_chat/tools/knowledge_base.py app/agents/new_chat/tools/link_preview.py app/agents/new_chat/tools/mcp_client.py @@ -35,6 +37,7 @@ app/agents/podcaster/nodes.py app/agents/podcaster/prompts.py app/agents/podcaster/state.py app/agents/podcaster/utils.py +app/agents/tools/investment_tools.py app/config/__init__.py app/config/uvicorn.py app/connectors/google_drive/__init__.py @@ -62,6 +65,7 @@ app/routes/composio_routes.py app/routes/documents_routes.py app/routes/editor_routes.py app/routes/financegpt_docs_routes.py +app/routes/investments.py app/routes/logs_routes.py app/routes/new_chat_routes.py app/routes/new_llm_config_routes.py @@ -82,6 +86,7 @@ app/schemas/discord_auth_credentials.py app/schemas/documents.py app/schemas/financegpt_docs.py app/schemas/google_drive.py +app/schemas/investments.py app/schemas/linear_auth_credentials.py app/schemas/logs.py app/schemas/new_chat.py @@ -100,7 +105,9 @@ app/services/chat_session_state_service.py app/services/composio_service.py app/services/connector_service.py app/services/docling_service.py +app/services/investment_csv_parser.py app/services/kokoro_tts_service.py +app/services/llm_document_extractor.py app/services/llm_service.py app/services/new_streaming_service.py app/services/notification_service.py @@ -109,6 +116,7 @@ app/services/plaid_service.py app/services/reranker_service.py app/services/stt_service.py app/services/task_logging_service.py +app/services/yahoo_finance_enrichment.py app/tasks/__init__.py app/tasks/composio_indexer.py app/tasks/financegpt_docs_indexer.py diff --git a/financegpt_backend/financegpt_new_backend.egg-info/requires.txt b/financegpt_backend/financegpt_new_backend.egg-info/requires.txt index 010f4c4..c840bdc 100644 --- a/financegpt_backend/financegpt_new_backend.egg-info/requires.txt +++ b/financegpt_backend/financegpt_new_backend.egg-info/requires.txt @@ -39,6 +39,10 @@ langchain-community>=0.3.31 langchain-unstructured>=1.0.0 litellm>=1.80.10 langchain-litellm>=0.3.5 +yfinance>=0.2.48 +instructor>=1.7.0 +pdfplumber>=0.11.0 +openpyxl>=3.1.0 ofxparse>=0.21 pdfplumber>=0.11.0 plaid-python>=28.0.0 diff --git a/financegpt_backend/pyproject.toml b/financegpt_backend/pyproject.toml index f9cd127..d6fe159 100644 --- a/financegpt_backend/pyproject.toml +++ b/financegpt_backend/pyproject.toml @@ -33,7 +33,7 @@ dependencies = [ "static-ffmpeg>=2.13", "tavily-python>=0.3.2", "unstructured-client>=0.30.0", - # "unstructured[pdf,docx,xlsx]>=0.17.0", # Commented: pulled in by langchain-unstructured, avoid version conflicts + "unstructured[all-docs]==0.17.2", # Pin to 0.17.2 for Python 3.12 compatibility (0.18+ requires llvmlite which only supports Python <3.10) "uvicorn[standard]>=0.34.0", "validators>=0.34.0", "youtube-transcript-api>=1.0.3", @@ -48,6 +48,10 @@ dependencies = [ "langchain-unstructured>=1.0.0", "litellm>=1.80.10", "langchain-litellm>=0.3.5", + "yfinance>=0.2.48", + "instructor>=1.7.0", + "pdfplumber>=0.11.0", + "openpyxl>=3.1.0", "ofxparse>=0.21", "pdfplumber>=0.11.0", "plaid-python>=28.0.0", diff --git a/financegpt_web/.env.example b/financegpt_web/.env.example index d390a0c..193302b 100644 --- a/financegpt_web/.env.example +++ b/financegpt_web/.env.example @@ -1,6 +1,6 @@ NEXT_PUBLIC_FASTAPI_BACKEND_URL=http://localhost:8000 NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE=LOCAL or GOOGLE -NEXT_PUBLIC_ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING +NEXT_PUBLIC_ETL_SERVICE=UNSTRUCTURED # Electric SQL NEXT_PUBLIC_ELECTRIC_URL=http://localhost:5133 @@ -10,4 +10,4 @@ NEXT_PUBLIC_ELECTRIC_AUTH_MODE=insecure DATABASE_URL=postgresql://postgres:[YOUR-PASSWORD]@db.sdsf.supabase.co:5432/postgres # Obsidian flag for cloud version (optional) -NEXT_PUBLIC_DEPLOYMENT_MODE="self-hosted" or "cloud" \ No newline at end of file +NEXT_PUBLIC_DEPLOYMENT_MODE="self-hosted" \ No newline at end of file