diff --git a/.env.example b/.env.example index 399ac9b2b..a38eba5b6 100644 --- a/.env.example +++ b/.env.example @@ -53,6 +53,11 @@ APP_FULL_URL= QDRANT_FULL_URL= ADDITIONAL_CORS_ORIGINS= +# Posthog Configuration +POSTHOG_API_KEY=phc_your_api_key_here +POSTHOG_HOST=https://app.posthog.com +ANALYTICS_ENABLED=true + # Other Settings PROJECT_NAME=Airweave LOG_LEVEL=INFO diff --git a/backend/airweave/analytics/README.md b/backend/airweave/analytics/README.md new file mode 100644 index 000000000..a53e3b4a8 --- /dev/null +++ b/backend/airweave/analytics/README.md @@ -0,0 +1,293 @@ +# Airweave Analytics Module + +This module provides PostHog analytics integration for Airweave, enabling comprehensive tracking of user behavior, business metrics, and system performance. + +## 🏗️ Architecture + +The analytics module is organized into several components: + +- **`service.py`**: Core PostHog integration service +- **`decorators/`**: Decorators for automatic tracking of API endpoints and operations +- **`events/`**: Business event tracking classes for high-level metrics +- **`config.py`**: Analytics configuration (integrated into core config) + +## 🚀 Quick Start + +### 1. Environment Setup + +Add these variables to your `.env` file: + +```bash +# PostHog Configuration +POSTHOG_API_KEY=phc_your_api_key_here +POSTHOG_HOST=https://app.posthog.com +ANALYTICS_ENABLED=true +``` + +### 2. Basic Usage + +```python +from airweave.analytics import analytics, business_events + +# Track a custom event +analytics.track_event( + event_name="custom_event", + distinct_id="user_123", + properties={"key": "value"} +) + +# Track business events +business_events.track_organization_created( + organization_id=org_id, + user_id=user_id, + properties={"plan": "trial"} +) +``` + +### 3. Using Decorators + +```python +from airweave.analytics import track_api_endpoint, track_search_operation + +@track_api_endpoint("create_collection") +async def create_collection(ctx: ApiContext, ...): + # Your endpoint logic + pass + +@track_search_operation() +async def search_collection(ctx: ApiContext, query: str, ...): + # Your search logic + pass +``` + +## 📊 Complete Analytics Events Overview + +### API Events +- **`api_call`**: Successful API calls with timing and context +- **`api_call_error`**: Failed API calls with error details and status codes + +**Covered Endpoints:** +- `create_organization` - Organization creation +- `list_collections` - Collection listing +- `create_collection` - Collection creation +- `create_source_connection` - Source connection setup +- `run_sync` - Sync execution +- `search` - Basic search queries +- `search_advanced` - Advanced search queries + +### Search Events +- **`search_query`**: Successful search operations with query analysis +- **`search_query_error`**: Failed search operations with error details + +### Business Events +- **`organization_created`**: New organization signup +- **`collection_created`**: New collection creation +- **`source_connection_created`**: New source integration + +### Sync Events +- **`sync_completed`**: Successful sync job completion with entity counts +- **`entities_synced_by_type`**: Granular entity tracking per sync and entity type + +## 🎯 Dashboard Strategy + +### Dashboard 1: Airweave Overview +**Purpose:** High-level business metrics and system health +**Key Metrics:** +- Query volume over time (weekly/monthly) +- Query response times +- Popular data sources +- Error rates by endpoint +- Total entities in sync +- Query volume per organization + +### Dashboard 2: User Journey +**Purpose:** Track user progression and identify drop-off points +**Key Metrics:** +- User funnel: org created → collection created → source added → first search +- Time to first search ("time to wow") +- Feature adoption rates +- User retention metrics + +### Dashboard 3: Syncing & Storage +**Purpose:** Monitor sync performance and storage usage +**Key Metrics:** +- Sync success/error rates +- Entities synced per sync configuration +- Storage usage by organization +- Sync performance trends +- Entity type distribution + +### Dashboard 4: Performance & Errors +**Purpose:** System reliability and performance monitoring +**Key Metrics:** +- API error rates by endpoint +- Search error rates +- Sync error rates +- Performance trends +- Error patterns and troubleshooting + +### Dashboard 5: Advanced Analytics +**Purpose:** Deep insights and custom analysis +**Key Metrics:** +- Query patterns and complexity +- User behavior analysis +- Integration health scores +- Custom business metrics + +## 📈 PostHog Widget Configurations + +### Overview Dashboard Widgets +1. **Query Volume Over Time** + - Event: `search_query` + - Type: Line Chart + - Property: Count + - Time Range: Last 30 days + +2. **Query Response Times** + - Event: `search_query` + - Type: Line Chart + - Property: `duration_ms` (Average) + - Time Range: Last 7 days + +3. **Error Rate by Endpoint** + - Event: `api_call_error` + - Type: Bar Chart + - Property: Count + - Breakdown: `endpoint` + - Time Range: Last 7 days + +### User Journey Dashboard Widgets +1. **User Funnel** + - Events: `organization_created` → `collection_created` → `source_connection_created` → `search_query` + - Type: Funnel + - Time Range: Last 30 days + +2. **Time to First Search** + - Event: `search_query` + - Type: Histogram (if supported) or Line Chart + - Property: `searched_at` + - Time Range: Last 30 days + +### Syncing Dashboard Widgets +1. **Sync Success Rate** + - Event: `sync_completed` + - Type: Line Chart + - Property: Count + - Time Range: Last 30 days + +2. **Entities Synced per Sync** + - Event: `sync_completed` + - Type: Bar Chart + - Property: `total_entities` (Sum) + - Breakdown: `sync_id` + - Time Range: Last 7 days + +3. **Storage Usage by Organization** + - Event: `entities_synced_by_type` + - Type: Bar Chart + - Property: `entity_count` (Sum) + - Breakdown: `organization_name` + - Time Range: Last 7 days + +## 🔧 Configuration + +The analytics module respects these configuration options: + +- `POSTHOG_API_KEY`: Your PostHog API key (required) +- `POSTHOG_HOST`: PostHog host URL (default: https://app.posthog.com) +- `ANALYTICS_ENABLED`: Enable/disable analytics (default: true) +- `ENVIRONMENT`: Deployment environment - added as property to all events + +**Important**: Analytics events are emitted when `ANALYTICS_ENABLED=true`. Each event includes an `environment` property allowing you to filter by environment in PostHog dashboards. Control which environments emit events via their respective environment files. + +### Environment Configuration Examples + +```bash +# Production environment (.env.prod) +ANALYTICS_ENABLED=true +ENVIRONMENT=prd + +# Development environment (.env.dev) +ANALYTICS_ENABLED=true +ENVIRONMENT=dev + +# Local development (.env.local) +ANALYTICS_ENABLED=false +ENVIRONMENT=local + +# Testing (.env.test) +ANALYTICS_ENABLED=false +ENVIRONMENT=test +``` + +### PostHog Dashboard Filtering + +- **Production Only**: `environment = "prd"` +- **All Environments**: No filter +- **Exclude Local**: `environment != "local"` +- **Development Only**: `environment = "dev"` + +## 💡 Best Practices + +### 1. Use Decorators for Automatic Tracking +```python +@track_api_endpoint("endpoint_name") +async def my_endpoint(ctx: ApiContext, ...): + # Automatically tracks timing, errors, and context + pass +``` + +### 2. Track Business Events at Key Milestones +```python +# Track when user completes onboarding +business_events.track_first_sync_completed(ctx, sync_id, entities_count) +``` + +### 3. Include Rich Context +```python +analytics.track_event( + event_name="custom_event", + distinct_id=user_id, + properties={ + "organization_name": ctx.organization.name, + "plan": ctx.organization.plan, + "feature": "advanced_search" + }, + groups={"organization": str(ctx.organization.id)} +) +``` + +### 4. Handle Errors Gracefully +The analytics service automatically handles PostHog errors and logs them without affecting your application. + +## 🔒 Privacy & Compliance + +- All user data is sent to PostHog (ensure compliance with your privacy policy) +- User IDs are hashed/obfuscated as needed +- Sensitive data should not be included in event properties +- Consider data retention policies in PostHog + +## 🚨 Troubleshooting + +### Common Issues + +1. **Events not appearing in PostHog** + - Check `POSTHOG_API_KEY` is set correctly + - Verify `ANALYTICS_ENABLED=true` + - Check logs for PostHog errors + +2. **High event volume** + - PostHog free tier: 1M events/month + - Consider sampling for high-volume events + - Use `ANALYTICS_ENABLED=false` to disable + +3. **Performance impact** + - Analytics calls are async and non-blocking + - Errors are logged but don't affect application flow + - Consider batching for high-frequency events + +## 📚 Additional Resources + +- [PostHog Documentation](https://posthog.com/docs) +- [PostHog Python SDK](https://posthog.com/docs/libraries/python) +- [Airweave Analytics Examples](analytics_integration_example.py) diff --git a/backend/airweave/analytics/__init__.py b/backend/airweave/analytics/__init__.py new file mode 100644 index 000000000..6e3f701b9 --- /dev/null +++ b/backend/airweave/analytics/__init__.py @@ -0,0 +1,13 @@ +"""Analytics module for PostHog integration.""" + +from .decorators.api import track_api_endpoint +from .decorators.search import track_search_operation +from .events.business_events import business_events +from .service import analytics + +__all__ = [ + "analytics", + "business_events", + "track_api_endpoint", + "track_search_operation", +] diff --git a/backend/airweave/analytics/decorators/__init__.py b/backend/airweave/analytics/decorators/__init__.py new file mode 100644 index 000000000..c7f85c658 --- /dev/null +++ b/backend/airweave/analytics/decorators/__init__.py @@ -0,0 +1,9 @@ +"""Decorators for analytics tracking.""" + +from .api import track_api_endpoint +from .search import track_search_operation + +__all__ = [ + "track_api_endpoint", + "track_search_operation", +] diff --git a/backend/airweave/analytics/decorators/api.py b/backend/airweave/analytics/decorators/api.py new file mode 100644 index 000000000..6a50341ac --- /dev/null +++ b/backend/airweave/analytics/decorators/api.py @@ -0,0 +1,125 @@ +"""Decorators for tracking API endpoint metrics.""" + +import asyncio +import time +from functools import wraps +from typing import Any, Callable, Optional, TypeVar + +from fastapi import HTTPException + +from airweave.analytics.service import analytics + +F = TypeVar("F", bound=Callable[..., Any]) + + +def track_api_endpoint(event_name: Optional[str] = None, include_timing: bool = True): + """Decorator to track API endpoint calls with performance metrics. + + Args: + ---- + event_name: Custom event name (defaults to function name) + include_timing: Whether to include timing metrics + """ + + def decorator(func: F) -> F: + @wraps(func) + async def async_wrapper(*args, **kwargs): + start_time = time.time() if include_timing else None + ctx = None + error = None + status_code = 200 + + # Extract ApiContext from kwargs + for arg in kwargs.values(): + if hasattr(arg, "user") and hasattr(arg, "organization"): + ctx = arg + break + + try: + result = await func(*args, **kwargs) + return result + except HTTPException as e: + error = e.detail + status_code = e.status_code + raise + except Exception as e: + error = str(e) + status_code = 500 + raise + finally: + if ctx: + properties = { + "endpoint": event_name or func.__name__, + "status_code": status_code, + "auth_method": getattr(ctx, "auth_method", "unknown"), + "organization_name": getattr(ctx.organization, "name", "unknown"), + } + + if include_timing and start_time: + properties["duration_ms"] = (time.time() - start_time) * 1000 + + if error: + properties["error"] = error + + event_suffix = "_error" if error else "" + analytics.track_event( + event_name=f"api_call{event_suffix}", + distinct_id=str(ctx.user.id) + if ctx.user + else f"api_key_{ctx.organization.id}", + properties=properties, + groups={"organization": str(ctx.organization.id)}, + ) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + start_time = time.time() if include_timing else None + ctx = None + error = None + status_code = 200 + + # Extract ApiContext from kwargs + for arg in kwargs.values(): + if hasattr(arg, "user") and hasattr(arg, "organization"): + ctx = arg + break + + try: + result = func(*args, **kwargs) + return result + except HTTPException as e: + error = e.detail + status_code = e.status_code + raise + except Exception as e: + error = str(e) + status_code = 500 + raise + finally: + if ctx: + properties = { + "endpoint": event_name or func.__name__, + "status_code": status_code, + "auth_method": getattr(ctx, "auth_method", "unknown"), + "organization_name": getattr(ctx.organization, "name", "unknown"), + } + + if include_timing and start_time: + properties["duration_ms"] = (time.time() - start_time) * 1000 + + if error: + properties["error"] = error + + event_suffix = "_error" if error else "" + analytics.track_event( + event_name=f"api_call{event_suffix}", + distinct_id=str(ctx.user.id) + if ctx.user + else f"api_key_{ctx.organization.id}", + properties=properties, + groups={"organization": str(ctx.organization.id)}, + ) + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper + + return decorator diff --git a/backend/airweave/analytics/decorators/search.py b/backend/airweave/analytics/decorators/search.py new file mode 100644 index 000000000..7da62e52b --- /dev/null +++ b/backend/airweave/analytics/decorators/search.py @@ -0,0 +1,103 @@ +"""Decorators for tracking search operations.""" + +import time +from functools import wraps +from typing import Any, Callable, TypeVar + +from airweave.analytics.service import analytics + +F = TypeVar("F", bound=Callable[..., Any]) + + +def track_search_operation(): + """Decorator to track search operations with query analysis.""" + + def decorator(func: F) -> F: + @wraps(func) + async def wrapper(*args, **kwargs): + start_time = time.time() + ctx = None + query = None + collection_id = None + + # Extract parameters from function signature + for arg in kwargs.values(): + if hasattr(arg, "user") and hasattr(arg, "organization"): + ctx = arg + + # Extract query and collection info + query = kwargs.get("query") + collection_id = kwargs.get("readable_id") + search_request = kwargs.get("search_request") + + if search_request and hasattr(search_request, "query"): + query = search_request.query + + try: + result = await func(*args, **kwargs) + + if ctx and query: + duration_ms = (time.time() - start_time) * 1000 + + properties = { + "query_length": len(query), + "collection_id": collection_id, + "duration_ms": duration_ms, + "results_count": len(result.results) if hasattr(result, "results") else 0, + "response_type": str(result.response_type) + if hasattr(result, "response_type") + else None, + "status": str(result.status) if hasattr(result, "status") else "success", + "organization_name": getattr(ctx.organization, "name", "unknown"), + } + + # Add search-specific metrics + if hasattr(result, "results") and result.results: + # Analyze result quality + scores = [r.get("score", 0) for r in result.results if isinstance(r, dict)] + if scores: + properties.update( + { + "avg_score": sum(scores) / len(scores), + "max_score": max(scores), + "min_score": min(scores), + } + ) + + analytics.track_event( + event_name="search_query", + distinct_id=str(ctx.user.id) + if ctx.user + else f"api_key_{ctx.organization.id}", + properties=properties, + groups={"organization": str(ctx.organization.id)}, + ) + + return result + + except Exception as e: + if ctx and query: + duration_ms = (time.time() - start_time) * 1000 + + properties = { + "query_length": len(query) if query else 0, + "collection_id": collection_id, + "duration_ms": duration_ms, + "error": str(e), + "error_type": type(e).__name__, + } + + analytics.track_event( + event_name="search_query_error", + distinct_id=str(ctx.user.id) + if ctx.user + else f"api_key_{ctx.organization.id}", + properties=properties, + groups={"organization": str(ctx.organization.id)}, + ) + + raise + + return wrapper + + return decorator diff --git a/backend/airweave/analytics/events/__init__.py b/backend/airweave/analytics/events/__init__.py new file mode 100644 index 000000000..7c34a3806 --- /dev/null +++ b/backend/airweave/analytics/events/__init__.py @@ -0,0 +1,7 @@ +"""Event definitions for analytics tracking.""" + +from .business_events import business_events + +__all__ = [ + "business_events", +] diff --git a/backend/airweave/analytics/events/business_events.py b/backend/airweave/analytics/events/business_events.py new file mode 100644 index 000000000..13418503d --- /dev/null +++ b/backend/airweave/analytics/events/business_events.py @@ -0,0 +1,230 @@ +"""High-level business metrics tracking.""" + +from typing import Any, Dict, Optional +from uuid import UUID + +from airweave.analytics.service import analytics + + +class BusinessEventTracker: + """Tracks high-level business metrics and organizational events.""" + + @staticmethod + def track_organization_created( + organization_id: UUID, user_id: UUID, properties: Optional[Dict[str, Any]] = None + ): + """Track when a new organization is created. + + Args: + ---- + organization_id: ID of the created organization + user_id: ID of the user who created it + properties: Additional properties + """ + event_properties = { + "organization_id": str(organization_id), + **(properties or {}), + } + + analytics.track_event( + event_name="organization_created", + distinct_id=str(user_id), + properties=event_properties, + groups={"organization": str(organization_id)}, + ) + + @staticmethod + def track_collection_created(ctx, collection_id: UUID, collection_name: str): + """Track when a new collection is created. + + Args: + ---- + ctx: API context containing user and organization info + collection_id: ID of the created collection + collection_name: Name of the collection + """ + properties = { + "collection_id": str(collection_id), + "collection_name": collection_name, + "organization_name": getattr(ctx.organization, "name", "unknown"), + } + + analytics.track_event( + event_name="collection_created", + distinct_id=str(ctx.user.id) if ctx.user else f"api_key_{ctx.organization.id}", + properties=properties, + groups={"organization": str(ctx.organization.id)}, + ) + + @staticmethod + def track_source_connection_created(ctx, connection_id: UUID, source_short_name: str): + """Track when a new source connection is created. + + Args: + ---- + ctx: API context containing user and organization info + connection_id: ID of the created connection + source_short_name: Short name of the source (e.g., 'slack', 'notion') + """ + properties = { + "connection_id": str(connection_id), + "source_type": source_short_name, + "organization_name": getattr(ctx.organization, "name", "unknown"), + } + + analytics.track_event( + event_name="source_connection_created", + distinct_id=str(ctx.user.id) if ctx.user else f"api_key_{ctx.organization.id}", + properties=properties, + groups={"organization": str(ctx.organization.id)}, + ) + + @staticmethod + def track_first_sync_completed(ctx, sync_id: UUID, entities_processed: int): + """Track when an organization completes their first sync. + + Args: + ---- + ctx: API context containing user and organization info + sync_id: ID of the sync operation + entities_processed: Number of entities processed + """ + properties = { + "sync_id": str(sync_id), + "entities_processed": entities_processed, + "organization_name": getattr(ctx.organization, "name", "unknown"), + } + + analytics.track_event( + event_name="first_sync_completed", + distinct_id=str(ctx.user.id) if ctx.user else f"api_key_{ctx.organization.id}", + properties=properties, + groups={"organization": str(ctx.organization.id)}, + ) + + @staticmethod + def track_usage_limit_approached( + organization_id: UUID, limit_type: str, current_usage: int, limit: int + ): + """Track when an organization approaches usage limits. + + Args: + ---- + organization_id: ID of the organization + limit_type: Type of limit (e.g., 'api_calls', 'storage_gb') + current_usage: Current usage count + limit: Limit threshold + """ + properties = { + "limit_type": limit_type, + "current_usage": current_usage, + "limit": limit, + "usage_percentage": (current_usage / limit) * 100 if limit > 0 else 0, + } + + analytics.track_event( + event_name="usage_limit_approached", + distinct_id=f"org_{organization_id}", # Use org as distinct_id for system events + properties=properties, + groups={"organization": str(organization_id)}, + ) + + @staticmethod + def track_user_login(ctx, login_method: str = "unknown"): + """Track user login events. + + Args: + ---- + ctx: API context containing user and organization info + login_method: Method used for login (e.g., 'oauth', 'api_key') + """ + properties = { + "login_method": login_method, + "organization_name": getattr(ctx.organization, "name", "unknown"), + } + + analytics.track_event( + event_name="user_login", + distinct_id=str(ctx.user.id) if ctx.user else f"api_key_{ctx.organization.id}", + properties=properties, + groups={"organization": str(ctx.organization.id)}, + ) + + @staticmethod + def track_sync_started(ctx, sync_id: UUID, source_type: str, collection_id: UUID): + """Track when a sync operation starts. + + Args: + ---- + ctx: API context containing user and organization info + sync_id: ID of the sync operation + source_type: Type of source being synced + collection_id: ID of the collection being synced + """ + properties = { + "sync_id": str(sync_id), + "source_type": source_type, + "collection_id": str(collection_id), + "organization_name": getattr(ctx.organization, "name", "unknown"), + } + + analytics.track_event( + event_name="sync_started", + distinct_id=str(ctx.user.id) if ctx.user else f"api_key_{ctx.organization.id}", + properties=properties, + groups={"organization": str(ctx.organization.id)}, + ) + + @staticmethod + def track_sync_completed(ctx, sync_id: UUID, entities_processed: int, duration_ms: int): + """Track when a sync operation completes successfully. + + Args: + ---- + ctx: API context containing user and organization info + sync_id: ID of the sync operation + entities_processed: Number of entities processed + duration_ms: Duration of sync in milliseconds + """ + properties = { + "sync_id": str(sync_id), + "entities_processed": entities_processed, + "duration_ms": duration_ms, + "organization_name": getattr(ctx.organization, "name", "unknown"), + } + + analytics.track_event( + event_name="sync_completed", + distinct_id=str(ctx.user.id) if ctx.user else f"api_key_{ctx.organization.id}", + properties=properties, + groups={"organization": str(ctx.organization.id)}, + ) + + @staticmethod + def track_sync_failed(ctx, sync_id: UUID, error: str, duration_ms: int): + """Track when a sync operation fails. + + Args: + ---- + ctx: API context containing user and organization info + sync_id: ID of the sync operation + error: Error message + duration_ms: Duration before failure in milliseconds + """ + properties = { + "sync_id": str(sync_id), + "error": error, + "duration_ms": duration_ms, + "organization_name": getattr(ctx.organization, "name", "unknown"), + } + + analytics.track_event( + event_name="sync_failed", + distinct_id=str(ctx.user.id) if ctx.user else f"api_key_{ctx.organization.id}", + properties=properties, + groups={"organization": str(ctx.organization.id)}, + ) + + +# Global instance +business_events = BusinessEventTracker() diff --git a/backend/airweave/analytics/service.py b/backend/airweave/analytics/service.py new file mode 100644 index 000000000..b3880343e --- /dev/null +++ b/backend/airweave/analytics/service.py @@ -0,0 +1,116 @@ +"""Core PostHog analytics service for Airweave.""" + +from typing import Any, Dict, Optional + +import posthog + +from airweave.core.config import settings +from airweave.core.logging import logger + + +class AnalyticsService: + """Centralized analytics service for PostHog integration. + + Handles all PostHog interactions and provides a clean interface + for tracking events throughout the Airweave application. + """ + + def __init__(self): + """Initialize the analytics service with PostHog configuration.""" + # Enable analytics based on ANALYTICS_ENABLED setting + self.enabled = bool(settings.POSTHOG_API_KEY and settings.ANALYTICS_ENABLED) + self.logger = logger.with_context(component="analytics") + + if self.enabled: + posthog.api_key = settings.POSTHOG_API_KEY + posthog.host = settings.POSTHOG_HOST or "https://app.posthog.com" + self.logger.info(f"PostHog analytics initialized (environment: {settings.ENVIRONMENT})") + else: + self.logger.info( + f"PostHog analytics disabled (environment: {settings.ENVIRONMENT}, " + f"api_key: {bool(settings.POSTHOG_API_KEY)}, " + f"enabled: {settings.ANALYTICS_ENABLED})" + ) + + def identify_user(self, user_id: str, properties: Dict[str, Any]) -> None: + """Identify a user with properties. + + Args: + ---- + user_id: Unique identifier for the user + properties: User properties to set + """ + if not self.enabled: + return + + try: + posthog.capture(distinct_id=user_id, event="$identify", properties={"$set": properties}) + self.logger.debug(f"User identified: {user_id}") + except Exception as e: + self.logger.error(f"Failed to identify user {user_id}: {e}") + + def track_event( + self, + event_name: str, + distinct_id: str, + properties: Optional[Dict[str, Any]] = None, + groups: Optional[Dict[str, str]] = None, + ) -> None: + """Track an event with optional properties and groups. + + Args: + ---- + event_name: Name of the event to track + distinct_id: Unique identifier for the user/entity + properties: Event properties + groups: Group associations (e.g., organization) + """ + if not self.enabled: + return + + try: + # Add environment to all events for filtering + event_properties = properties or {} + event_properties["environment"] = settings.ENVIRONMENT + + posthog.capture( + distinct_id=distinct_id, + event=event_name, + properties=event_properties, + groups=groups or {}, + ) + self.logger.debug(f"Event tracked: {event_name} for {distinct_id}") + except Exception as e: + self.logger.error(f"Failed to track event {event_name}: {e}") + + def set_group_properties( + self, group_type: str, group_key: str, properties: Dict[str, Any] + ) -> None: + """Set properties for a group (e.g., organization). + + Args: + ---- + group_type: Type of group (e.g., 'organization') + group_key: Unique identifier for the group + properties: Properties to set for the group + """ + if not self.enabled: + return + + try: + posthog.capture( + distinct_id=group_key, + event="$groupidentify", + properties={ + "$group_type": group_type, + "$group_key": group_key, + "$group_set": properties, + }, + ) + self.logger.debug(f"Group properties set: {group_type}:{group_key}") + except Exception as e: + self.logger.error(f"Failed to set group properties for {group_type}:{group_key}: {e}") + + +# Global analytics service instance +analytics = AnalyticsService() diff --git a/backend/airweave/api/v1/endpoints/collections.py b/backend/airweave/api/v1/endpoints/collections.py index dbd33e0db..84973acd3 100644 --- a/backend/airweave/api/v1/endpoints/collections.py +++ b/backend/airweave/api/v1/endpoints/collections.py @@ -10,6 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from airweave import crud, schemas +from airweave.analytics import business_events, track_api_endpoint, track_search_operation from airweave.api import deps from airweave.api.context import ApiContext from airweave.api.examples import ( @@ -41,6 +42,7 @@ "Finance data collection", ), ) +@track_api_endpoint("list_collections") async def list( skip: int = Query(0, description="Number of collections to skip for pagination"), limit: int = Query( @@ -61,6 +63,7 @@ async def list( @router.post("/", response_model=schemas.Collection) +@track_api_endpoint("create_collection") async def create( collection: schemas.CollectionCreate, db: AsyncSession = Depends(deps.get_db), @@ -78,6 +81,11 @@ async def create( # Create the collection collection_obj = await collection_service.create(db, collection_in=collection, ctx=ctx) + # Track business event + business_events.track_collection_created( + ctx=ctx, collection_id=collection_obj.id, collection_name=collection_obj.name + ) + # Increment usage after successful creation await guard_rail.increment(ActionType.COLLECTIONS) @@ -164,6 +172,7 @@ async def delete( response_model=schemas.SearchResponse, responses=create_search_response("raw_results", "Raw search results with metadata"), ) +@track_search_operation() async def search( readable_id: str = Path( ..., description="The unique readable identifier of the collection to search" @@ -263,6 +272,7 @@ async def search( response_model=schemas.SearchResponse, responses=create_search_response("completion_response", "Search with AI-generated completion"), ) +@track_search_operation() async def search_advanced( readable_id: str = Path( ..., description="The unique readable identifier of the collection to search" diff --git a/backend/airweave/api/v1/endpoints/organizations.py b/backend/airweave/api/v1/endpoints/organizations.py index 437a8c072..f3fe6b338 100644 --- a/backend/airweave/api/v1/endpoints/organizations.py +++ b/backend/airweave/api/v1/endpoints/organizations.py @@ -7,6 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from airweave import crud, schemas +from airweave.analytics import business_events, track_api_endpoint from airweave.api import deps from airweave.api.context import ApiContext from airweave.api.router import TrailingSlashRouter @@ -19,6 +20,7 @@ @router.post("/", response_model=schemas.Organization) +@track_api_endpoint("create_organization") async def create_organization( organization_data: schemas.OrganizationCreate, db: AsyncSession = Depends(deps.get_db), @@ -45,6 +47,17 @@ async def create_organization( db=db, org_data=organization_data, owner_user=user ) + # Track business event + business_events.track_organization_created( + organization_id=organization.id, + user_id=user.id, + properties={ + "plan": "trial", # Default plan for new organizations + "source": "signup", + "organization_name": organization.name, + }, + ) + return organization except Exception as e: raise HTTPException( diff --git a/backend/airweave/api/v1/endpoints/source_connections.py b/backend/airweave/api/v1/endpoints/source_connections.py index 717f74455..2059ee850 100644 --- a/backend/airweave/api/v1/endpoints/source_connections.py +++ b/backend/airweave/api/v1/endpoints/source_connections.py @@ -7,6 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from airweave import crud, schemas +from airweave.analytics import track_api_endpoint from airweave.api import deps from airweave.api.context import ApiContext from airweave.api.examples import ( @@ -100,6 +101,7 @@ async def get( @router.post("/", response_model=schemas.SourceConnection) +@track_api_endpoint("create_source_connection") async def create( *, db: AsyncSession = Depends(deps.get_db), diff --git a/backend/airweave/api/v1/endpoints/sync.py b/backend/airweave/api/v1/endpoints/sync.py index 79a9a7816..94904323c 100644 --- a/backend/airweave/api/v1/endpoints/sync.py +++ b/backend/airweave/api/v1/endpoints/sync.py @@ -10,6 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from airweave import crud, schemas +from airweave.analytics import track_api_endpoint from airweave.api import deps from airweave.api.context import ApiContext from airweave.api.router import TrailingSlashRouter @@ -170,6 +171,7 @@ async def delete_sync( @router.post("/{sync_id}/run", response_model=schemas.SyncJob) +@track_api_endpoint("run_sync") async def run_sync( *, db: AsyncSession = Depends(deps.get_db), diff --git a/backend/airweave/core/config.py b/backend/airweave/core/config.py index bcd51816f..ef448311c 100644 --- a/backend/airweave/core/config.py +++ b/backend/airweave/core/config.py @@ -135,6 +135,11 @@ class Settings(BaseSettings): RESEND_API_KEY: Optional[str] = None RESEND_FROM_EMAIL: Optional[str] = None + # PostHog Analytics Configuration + POSTHOG_API_KEY: Optional[str] = None + POSTHOG_HOST: str = "https://app.posthog.com" + ANALYTICS_ENABLED: bool = True + # Sync configuration SYNC_MAX_WORKERS: int = 100 SYNC_THREAD_POOL_SIZE: int = 100 diff --git a/backend/airweave/core/sync_job_service.py b/backend/airweave/core/sync_job_service.py index 682309942..a9fd92907 100644 --- a/backend/airweave/core/sync_job_service.py +++ b/backend/airweave/core/sync_job_service.py @@ -5,6 +5,7 @@ from uuid import UUID from airweave import crud, schemas +from airweave.analytics.service import analytics from airweave.api.context import ApiContext from airweave.core.datetime_utils import utc_now_naive from airweave.core.logging import logger @@ -144,9 +145,63 @@ async def update_status( f"Successfully updated sync job {sync_job_id} status to {db_status_value}" ) + # Track analytics for sync completion + if status == SyncJobStatus.COMPLETED and stats: + await self._track_sync_completion(sync_job_id, db_sync_job.sync_id, stats, ctx) + except Exception as e: logger.error(f"Failed to update sync job status: {e}") + async def _track_sync_completion( + self, sync_job_id: UUID, sync_id: UUID, stats: SyncProgressUpdate, ctx: ApiContext + ) -> None: + """Track analytics for sync completion with entity counts per sync and entity type.""" + try: + # Calculate total entities synced + total_entities = ( + stats.inserted + stats.updated + stats.deleted + stats.kept + stats.skipped + ) + + # Track sync completion event with sync_id + analytics.track_event( + event_name="sync_completed", + distinct_id=str(ctx.user.id) if ctx.user else f"api_key_{ctx.organization.id}", + properties={ + "sync_job_id": str(sync_job_id), + "sync_id": str(sync_id), + "total_entities": total_entities, + "entities_inserted": stats.inserted, + "entities_updated": stats.updated, + "entities_deleted": stats.deleted, + "entities_kept": stats.kept, + "entities_skipped": stats.skipped, + "organization_name": getattr(ctx.organization, "name", "unknown"), + }, + groups={"organization": str(ctx.organization.id)}, + ) + + # Track individual entity type counts for detailed analysis + if hasattr(stats, "entities_encountered") and stats.entities_encountered: + for entity_type, entity_count in stats.entities_encountered.items(): + user_id = str(ctx.user.id) if ctx.user else f"api_key_{ctx.organization.id}" + analytics.track_event( + event_name="entities_synced_by_type", + distinct_id=user_id, + properties={ + "sync_job_id": str(sync_job_id), + "sync_id": str(sync_id), + "entity_type": entity_type, + "entity_count": entity_count, + "organization_name": getattr(ctx.organization, "name", "unknown"), + }, + groups={"organization": str(ctx.organization.id)}, + ) + + logger.info(f"Tracked sync completion analytics for job {sync_job_id} (sync {sync_id})") + + except Exception as e: + logger.error(f"Failed to track sync completion analytics: {e}") + # Singleton instance sync_job_service = SyncJobService()