From cd3d688d8c2c6050f82b9fea3ecd89d10143e541 Mon Sep 17 00:00:00 2001 From: voynow Date: Wed, 9 Oct 2024 17:08:20 -0400 Subject: [PATCH] email on new activity --- .gitignore | 3 +- src/auth_manager.py | 31 +++- src/frontend_router.py | 71 +++++++++ src/lambda_function.py | 138 +----------------- src/supabase_client.py | 29 ++++ src/types/update_pipeline.py | 10 ++ src/{daily_pipeline.py => update_pipeline.py} | 73 +++++---- src/webhook_router.py | 46 ++++++ test.ipynb | 36 ++++- tests/test_full_week.py | 6 +- web/src/app/strava_webhook/route.tsx | 40 ++--- 11 files changed, 300 insertions(+), 183 deletions(-) create mode 100644 src/frontend_router.py create mode 100644 src/types/update_pipeline.py rename src/{daily_pipeline.py => update_pipeline.py} (67%) create mode 100644 src/webhook_router.py diff --git a/.gitignore b/.gitignore index 18f0ebb..10f918b 100644 --- a/.gitignore +++ b/.gitignore @@ -177,4 +177,5 @@ build/ Packages/ -tests/artifacts \ No newline at end of file +tests/artifacts +.vercel diff --git a/src/auth_manager.py b/src/auth_manager.py index fde6787..2650035 100644 --- a/src/auth_manager.py +++ b/src/auth_manager.py @@ -4,10 +4,11 @@ import jwt from dotenv import load_dotenv from stravalib.client import Client -from stravalib.model import Athlete -from src.supabase_client import get_user_auth, upsert_user_auth +from src.email_manager import send_alert_email +from src.supabase_client import get_user_auth, upsert_user, upsert_user_auth from src.types.user_auth_row import UserAuthRow +from src.types.user_row import UserRow load_dotenv() strava_client = Client() @@ -123,3 +124,29 @@ def get_strava_client(athlete_id: int) -> Client: """Interface for retrieving a Strava client with valid authentication""" user_auth = authenticate_athlete(athlete_id) return get_configured_strava_client(user_auth) + + +def signup(email: str, code: str) -> dict: + """ + Get authenticated user, upsert user with email and preferences + + :param email: user email + :param code: strava code + :return: jwt_token + """ + preferences = ( + "Looking for smart training recommendations to optimize my performance." + ) + send_alert_email( + subject="TrackFlow Alert: New Signup Attempt", + text_content=f"You have a new client {email=} attempting to signup with {preferences=}", + ) + user_auth = authenticate_with_code(code) + upsert_user( + UserRow( + athlete_id=user_auth.athlete_id, + email=email, + preferences=preferences, + ) + ) + return {"success": True, "jwt_token": user_auth.jwt_token} diff --git a/src/frontend_router.py b/src/frontend_router.py new file mode 100644 index 0000000..e9ad4e1 --- /dev/null +++ b/src/frontend_router.py @@ -0,0 +1,71 @@ +from typing import Callable, Dict, Optional + +import jwt + +from src import auth_manager +from src.supabase_client import ( + get_training_week, + get_user, + update_preferences, +) + + +def get_training_week_handler(athlete_id: str, payload: Optional[dict] = None) -> dict: + """Handle get_training_week request.""" + training_week = get_training_week(athlete_id) + return { + "success": True, + "training_week": training_week.json(), + } + + +def get_profile_handler(athlete_id: str, payload: Optional[dict] = None) -> dict: + """Handle get_profile request.""" + user = get_user(athlete_id) + athlete = auth_manager.get_strava_client(athlete_id).get_athlete() + return { + "success": True, + "profile": { + "firstname": athlete.firstname, + "lastname": athlete.lastname, + "profile": athlete.profile, + "email": user.email, + "preferences": user.preferences_json.json(), + "is_active": user.is_active, + }, + } + + +def update_preferences_handler(athlete_id: str, payload: Optional[dict] = None) -> dict: + """Handle update_preferences request.""" + if payload is None or "preferences" not in payload: + return {"success": False, "error": "Missing preferences in payload"} + update_preferences(athlete_id=athlete_id, preferences_json=payload["preferences"]) + return {"success": True} + + +METHOD_HANDLERS: Dict[str, Callable[[str, Optional[dict]], dict]] = { + "get_training_week": get_training_week_handler, + "get_profile": get_profile_handler, + "update_preferences": update_preferences_handler, +} + + +def handle_request(jwt_token: str, method: str, payload: Optional[dict] = None) -> dict: + """ + Handle various requests based on the provided method. + + :param jwt_token: JWT token for authentication + :param method: The method to be executed + :param payload: Optional dictionary with additional data + :return: Dictionary with the result of the operation + """ + try: + athlete_id = auth_manager.decode_jwt(jwt_token) + except jwt.DecodeError: + return {"success": False, "error": "Invalid JWT token"} + + if method in METHOD_HANDLERS: + return METHOD_HANDLERS[method](athlete_id, payload) + else: + return {"success": False, "error": f"Invalid method: {method}"} diff --git a/src/lambda_function.py b/src/lambda_function.py index 6d511cc..22ffbee 100644 --- a/src/lambda_function.py +++ b/src/lambda_function.py @@ -1,153 +1,30 @@ import logging import os import uuid -from typing import Optional -import jwt - -from src.auth_manager import authenticate_with_code, decode_jwt, get_strava_client -from src.daily_pipeline import daily_executor, webhook_executor +from src import auth_manager, frontend_router, update_pipeline, webhook_router from src.email_manager import send_alert_email -from src.supabase_client import ( - get_training_week, - get_user, - list_users, - update_preferences, - upsert_user, -) -from src.types.user_row import UserRow logger = logging.getLogger() logger.setLevel(logging.INFO) -def signup(email: str, code: str) -> dict: - """ - Get authenticated user, upsert user with email and preferences - - :param email: user email - :param code: strava code - :return: jwt_token - """ - preferences = ( - "Looking for smart training recommendations to optimize my performance." - ) - send_alert_email( - subject="TrackFlow Alert: New Signup Attempt", - text_content=f"You have a new client {email=} attempting to signup with {preferences=}", - ) - user_auth = authenticate_with_code(code) - upsert_user( - UserRow( - athlete_id=user_auth.athlete_id, - email=email, - preferences=preferences, - ) - ) - return {"success": True, "jwt_token": user_auth.jwt_token} - - -def handle_frontend_request( - jwt_token: str, method: str, payload: Optional[dict] = None -) -> dict: - """ - To be extended for other requests eventually - - Validate JWT, then return training week with coaching - - :param jwt_token: jwt_token - :param method: method - :param payload: optional dictionary with additional data - :return: dict with {"success": bool} - """ - try: - athlete_id = decode_jwt(jwt_token) - except jwt.DecodeError: - return {"success": False, "error": "Invalid JWT token"} - - if method == "get_training_week": - training_week = get_training_week(athlete_id) - return { - "success": True, - "training_week": training_week.json(), - } - elif method == "get_profile": - user = get_user(athlete_id) - athlete = get_strava_client(athlete_id).get_athlete() - return { - "success": True, - "profile": { - "firstname": athlete.firstname, - "lastname": athlete.lastname, - "profile": athlete.profile, - "email": user.email, - "preferences": user.preferences_json.json(), - "is_active": user.is_active, - }, - } - elif method == "update_preferences": - update_preferences( - athlete_id=athlete_id, preferences_json=payload["preferences"] - ) - return {"success": True} - else: - return {"success": False, "error": f"Invalid method: {method}"} - - -def handle_strava_webhook(event: dict) -> dict: - """ - Handle Strava webhook events for activities and athletes. - - :param event: Webhook event payload from Strava - :return: dict with {"success": bool} - """ - subscription_id = int(event.get("subscription_id")) - expected_subscription_id = int(os.environ["STRAVA_WEBHOOK_SUBSCRIPTION_ID"]) - if subscription_id != expected_subscription_id: - return { - "success": False, - "error": f"Invalid subscription ID: {event.get('subscription_id')}", - } - - if event.get("object_type") == "activity": - if event.get("aspect_type") == "create": - return webhook_executor(get_user(event.get("owner_id"))) - elif event.get("aspect_type") == "update": - return { - "success": True, - "message": f"Activity {event.get('object_id')} updated", - } - elif event.get("aspect_type") == "delete": - return { - "success": True, - "message": f"Activity {event.get('object_id')} deleted", - } - return {"success": False, "error": f"Unknown event type: {event}"} - - -def daily_exe_orchestrator() -> dict: - for user in list_users(): - if user.is_active: - daily_executor(user) - return {"success": True} - - def strategy_router(event: dict) -> dict: # Will fail on bad authenticate_with_code if event.get("email") and event.get("code"): - return signup( + return auth_manager.signup( email=event["email"], code=event["code"], ) # Will fail on bad authenticate_with_code elif event.get("code"): - user_auth = authenticate_with_code(event["code"]) + user_auth = auth_manager.authenticate_with_code(event["code"]) return {"success": True, "jwt_token": user_auth.jwt_token} elif event.get("jwt_token") and event.get("method"): - return handle_frontend_request( + return frontend_router.handle_request( jwt_token=event["jwt_token"], method=event["method"], payload=event.get("payload"), @@ -160,18 +37,17 @@ def strategy_router(event: dict) -> dict: and event.get("object_id") and event.get("owner_id") ): - return handle_strava_webhook(event) + return webhook_router.handle_request(event) # This will only run if triggered by NIGHTLY_EMAIL_TRIGGER_ARN elif ( event.get("resources") and event.get("resources")[0] == os.environ["NIGHTLY_EMAIL_TRIGGER_ARN"] ): - return daily_exe_orchestrator() + return update_pipeline.nightly_trigger_orchestrator() elif event.get("trigger_test_key") == os.environ["TRIGGER_TEST_KEY"]: - daily_executor(get_user(os.environ["JAMIES_ATHLETE_ID"])) - return {"success": True} + return update_pipeline.integration_test_executor() else: return {"success": False, "error": f"Unknown event type: {event}"} diff --git a/src/supabase_client.py b/src/supabase_client.py index f24c4b0..96ee1ad 100644 --- a/src/supabase_client.py +++ b/src/supabase_client.py @@ -1,6 +1,7 @@ import datetime import json import os +from datetime import timedelta, timezone from dotenv import load_dotenv from postgrest.base_request_builder import APIResponse @@ -218,3 +219,31 @@ def update_preferences(athlete_id: int, preferences_json: dict) -> APIResponse: .execute() ) return response + + +def has_user_updated_today(athlete_id: int) -> bool: + """ + Check if the user has received an update today. Where "today" is defined as + within the past 23 hours and 30 minutes (to account for any delays in + yesterday's evening update). + + :param athlete_id: The ID of the athlete + :return: True if the user has received an update today, False otherwise + """ + table = client.table("training_week") + response = ( + table.select("*") + .eq("athlete_id", athlete_id) + .order("created_at", desc=True) + .limit(1) + .execute() + ) + + if not response.data: + return False + + # "Has this user posted an activity in the last 23 hours and 30 minutes?" + time_diff = datetime.now(timezone.utc) - datetime.fromisoformat( + response.data[0]["created_at"] + ) + return time_diff < timedelta(hours=23, minutes=30) diff --git a/src/types/update_pipeline.py b/src/types/update_pipeline.py new file mode 100644 index 0000000..511a6c0 --- /dev/null +++ b/src/types/update_pipeline.py @@ -0,0 +1,10 @@ +from strenum import StrEnum + + +class ExeType(StrEnum): + NEW_WEEK = "new_week" + MID_WEEK = "mid_week" + + +class TrainingWeekUpdateError(Exception): + pass diff --git a/src/daily_pipeline.py b/src/update_pipeline.py similarity index 67% rename from src/daily_pipeline.py rename to src/update_pipeline.py index dffd282..92abe0e 100644 --- a/src/daily_pipeline.py +++ b/src/update_pipeline.py @@ -1,6 +1,6 @@ import logging -import traceback -from typing import Callable, Dict, Optional +import os +from typing import Callable, Dict from openai import APIResponse from stravalib.client import Client @@ -13,19 +13,19 @@ ) from src.auth_manager import get_strava_client from src.constants import COACH_ROLE -from src.email_manager import ( - send_alert_email, - send_email, - training_week_to_html, -) +from src.email_manager import send_email, training_week_to_html from src.mid_week_update import generate_mid_week_update from src.new_training_week import generate_new_training_week from src.supabase_client import ( get_training_week, get_training_week_test, + get_user, + has_user_updated_today, + list_users, upsert_training_week, ) from src.types.training_week import TrainingWeek +from src.types.update_pipeline import ExeType, TrainingWeekUpdateError from src.types.user_row import UserRow from src.utils import datetime_now_est @@ -33,7 +33,7 @@ logger.setLevel(logging.INFO) -def daily_generic_pipeline( +def training_week_update_pipeline( user: UserRow, pipeline_function: Callable[[UserRow, Client], TrainingWeek], email_subject: str = "TrackFlow 🏃‍♂️🎯", @@ -99,38 +99,57 @@ def mid_week_update_pipeline_test( def webhook_executor(user: UserRow) -> dict: """Silently updates db on every new activity""" - daily_generic_pipeline( + training_week_update_pipeline( user=user, pipeline_function=mid_week_update_pipeline, - send_email=lambda subject, html_content, to, sender={ - "name": "", - "email": "", - }: None, + email_subject="TrackFlow Update Inbound! 🏃‍♂️🎯", ) return {"success": True} -def daily_executor(user: UserRow) -> dict: +def training_week_update_executor(user: UserRow, exetype: ExeType) -> dict: """Decides between generating a new week or updating based on the day.""" try: - # Sunday is day 6 - if datetime_now_est().weekday() == 6: - daily_generic_pipeline( + if exetype == ExeType.NEW_WEEK: + training_week_update_pipeline( user=user, pipeline_function=new_training_week_pipeline, email_subject="Training Schedule Just Dropped 🏃‍♂️🎯", ) - else: - daily_generic_pipeline( + elif exetype == ExeType.MID_WEEK: + training_week_update_pipeline( user=user, pipeline_function=mid_week_update_pipeline, - email_subject="www.trackflow.xyz is live! 🏃‍♂️🎯", + email_subject="TrackFlow Update Inbound! 🏃‍♂️🎯", ) - return {"success": True} except Exception as e: - logger.error(f"Error processing user {user.athlete_id}: {e}") - send_alert_email( - subject="TrackFlow Alert: Error in Lambda Function", - text_content=f"Error for {user.email=} {e} with traceback: {traceback.format_exc()}", - ) - return {"success": False, "error": str(e)} + raise TrainingWeekUpdateError(f"Error processing user {user.athlete_id}: {e}") + + +def integration_test_executor() -> dict: + """ + Run a full update pipeline for Jamies account + """ + training_week_update_executor( + get_user(os.environ["JAMIES_ATHLETE_ID"]), ExeType.MID_WEEK + ) + training_week_update_executor( + get_user(os.environ["JAMIES_ATHLETE_ID"]), ExeType.NEW_WEEK + ) + return {"success": True} + + +def nightly_trigger_orchestrator() -> dict: + """ + Evenings excluding Sunday: Send update to users who have not yet triggered an update today + Sunday evening: Send new training week to all active users + """ + if datetime_now_est().weekday() != 6: + for user in list_users(): + if user.is_active and not has_user_updated_today(user.athlete_id): + training_week_update_executor(user, ExeType.MID_WEEK) + else: + for user in list_users(): + if user.is_active: + training_week_update_executor(user, ExeType.NEW_WEEK) + return {"success": True} diff --git a/src/webhook_router.py b/src/webhook_router.py new file mode 100644 index 0000000..9e3235e --- /dev/null +++ b/src/webhook_router.py @@ -0,0 +1,46 @@ +import os + +from src import auth_manager +from src.supabase_client import ( + get_user, +) +from src.types.update_pipeline import ExeType +from src.update_pipeline import training_week_update_executor + + +def handle_activity_create(user, event): + strava_client = auth_manager.get_strava_client(user.athlete_id) + activity = strava_client.get_activity(event.get("object_id")) + + if activity.sport_type == "Run": + return training_week_update_executor(user, ExeType.MID_WEEK) + + return {"success": False, "error": "Unsupported activity type"} + + +def handle_request(event: dict) -> dict: + """ + Handle Strava webhook events for activities and athletes. + + :param event: Webhook event payload from Strava + :return: dict with {"success": bool, "message": str (optional)} + """ + if int(event.get("subscription_id")) != int( + os.environ["STRAVA_WEBHOOK_SUBSCRIPTION_ID"] + ): + return {"success": False, "error": "Invalid subscription ID"} + + user = get_user(event.get("owner_id")) + event_type = event.get("object_type") + aspect_type = event.get("aspect_type") + + if event_type == "activity": + if aspect_type == "create": + return handle_activity_create(user, event) + if aspect_type in {"update", "delete"}: + return { + "success": True, + "message": f"Activity {event.get('object_id')} {aspect_type}d", + } + + return {"success": False, "error": "Unknown event type"} diff --git a/test.ipynb b/test.ipynb index ddc36ff..203cfb8 100644 --- a/test.ipynb +++ b/test.ipynb @@ -215,9 +215,41 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 10, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "No rates present in response headers\n" + ] + } + ], + "source": [ + "from src.supabase_client import get_user\n", + "from src.auth_manager import get_strava_client\n", + "\n", + "user = get_user(104454087)\n", + "strava_client = get_strava_client(user.athlete_id)" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [] }, { diff --git a/tests/test_full_week.py b/tests/test_full_week.py index d3ee2ed..2c9bcaf 100644 --- a/tests/test_full_week.py +++ b/tests/test_full_week.py @@ -5,10 +5,10 @@ from stravalib.client import Client from src.auth_manager import get_strava_client -from src.daily_pipeline import ( - daily_generic_pipeline, +from src.update_pipeline import ( mid_week_update_pipeline_test, new_training_week_pipeline, + training_week_update_pipeline, ) from src.supabase_client import upsert_training_week_test from src.types.training_week import TrainingWeek @@ -22,7 +22,7 @@ def gen_helper( ) -> TrainingWeek: @freeze_time(f"{date_str} 20:30:00") def wrapped(user: UserRow) -> TrainingWeek: - training_week = daily_generic_pipeline( + training_week = training_week_update_pipeline( user=user, pipeline_function=func, upsert_training_week=upsert_training_week_test, diff --git a/web/src/app/strava_webhook/route.tsx b/web/src/app/strava_webhook/route.tsx index fb26c9c..7c98bd4 100644 --- a/web/src/app/strava_webhook/route.tsx +++ b/web/src/app/strava_webhook/route.tsx @@ -20,22 +20,28 @@ export async function GET(request: NextRequest) { export async function POST(request: NextRequest) { const event = await request.json(); console.log(`Received POST request with event: ${JSON.stringify(event)}`); - try { - const response = await fetch('https://lwg77yq7dd.execute-api.us-east-1.amazonaws.com/prod/signup', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(event), - }); - if (response.ok) { - console.log('Successful response from signup API'); - return NextResponse.json({ success: true }); - } else { - const errorData = await response.json(); - console.log(`Error response from signup API: ${JSON.stringify(errorData)}`); - return NextResponse.json({ success: false, error: errorData }, { status: 500 }); + + // Respond immediately to Strava with 200 OK + const immediateResponse = NextResponse.json({}, { status: 200 }); + + (async () => { + try { + const response = await fetch('https://lwg77yq7dd.execute-api.us-east-1.amazonaws.com/prod/signup', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(event), + }); + + if (response.ok) { + console.log('Successful response from signup API'); + } else { + const errorData = await response.json(); + console.log(`Error response from signup API: ${JSON.stringify(errorData)}`); + } + } catch (error) { + console.error(`Error occurred while processing POST request: ${error}`); } - } catch (error) { - console.error(`Error occurred while processing POST request: ${error}`); - return NextResponse.json({ success: false, error: 'Internal server error' }, { status: 500 }); - } + })(); + + return immediateResponse; }