diff --git a/authAPI.py b/authAPI.py new file mode 100644 index 0000000..a5d2e84 --- /dev/null +++ b/authAPI.py @@ -0,0 +1,86 @@ +from fastapi import APIRouter, HTTPException, Depends, status +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from passlib.context import CryptContext +from jose import JWTError, jwt +from sqlalchemy.orm import Session +from datetime import datetime, timedelta +from models import User # Your SQLAlchemy User model +from database import get_db # DB Session dependency + +SECRET_KEY = "your-secret-key" # Use env var in production! +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 60 + +auth_router = APIRouter(tags=["Authentication"]) +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login") + + +def hash_password(password: str): + return pwd_context.hash(password) + + +def verify_password(plain_password: str, hashed_password: str): + return pwd_context.verify(plain_password, hashed_password) + + +def create_access_token(data: dict, expires_delta: timedelta = None): + to_encode = data.copy() + expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15)) + to_encode.update({"exp": expire}) + return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + + +def get_user_by_username(db: Session, username: str): + return db.query(User).filter(User.username == username).first() + + +def authenticate_user(db: Session, username: str, password: str): + user = get_user_by_username(db, username) + if not user or not verify_password(password, user.password): + return None + return user + + +async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid authentication credentials.", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username = payload.get("sub") + if username is None: + raise credentials_exception + except JWTError: + raise credentials_exception + + user = get_user_by_username(db, username) + if user is None: + raise credentials_exception + return user + + +@auth_router.post("/auth/signup") +def signup(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): + if get_user_by_username(db, form_data.username): + raise HTTPException(status_code=400, detail="Username already taken") + user = User( + username=form_data.username, + password=hash_password(form_data.password), + ) + db.add(user) + db.commit() + db.refresh(user) + return {"msg": "User created successfully"} + + +@auth_router.post("/auth/login") +def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): + user = authenticate_user(db, form_data.username, form_data.password) + if not user: + raise HTTPException(status_code=400, detail="Incorrect username or password") + + token = create_access_token(data={"sub": user.username}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) + return {"access_token": token, "token_type": "bearer"}