-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
solution #48
base: main
Are you sure you want to change the base?
solution #48
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
from typing import List | ||
from fastapi import HTTPException | ||
from sqlalchemy.exc import SQLAlchemyError | ||
from sqlalchemy.orm import Session | ||
from database.models import MovieModel, CountryModel, GenreModel, ActorModel, LanguageModel | ||
from schemas.movies import MovieCreateSchema, MovieUpdateSchema | ||
def get_movies(db: Session, offset, per_page): | ||
return db.query(MovieModel).order_by(MovieModel.id.desc()).offset(offset).limit(per_page).all() | ||
def get_movie(db: Session, movie_id): | ||
return db.query(MovieModel).filter(MovieModel.id == movie_id).first() | ||
def create_objects(data: List[str], model, db: Session): | ||
objects = [model(name=element) for element in data] | ||
db.add_all(objects) | ||
db.commit() | ||
for obj in objects: | ||
db.refresh(obj) | ||
return objects | ||
def create_movie(db: Session, movie: MovieCreateSchema): | ||
try: | ||
country_data = db.query(CountryModel).filter(CountryModel.code == movie.country).first() | ||
if not country_data: | ||
country_data = CountryModel(code=movie.country) | ||
db.add(country_data) | ||
db.commit() | ||
db.refresh(country_data) | ||
genres_data = db.query(GenreModel).filter(GenreModel.name.in_(movie.genres)).all() | ||
if not genres_data: | ||
genres_data = create_objects(movie.genres, GenreModel, db) | ||
actors_data = db.query(ActorModel).filter(ActorModel.name.in_(movie.actors)).all() | ||
if not actors_data: | ||
actors_data = create_objects(movie.actors, ActorModel, db) | ||
languages_data = db.query(LanguageModel).filter(LanguageModel.name.in_(movie.languages)).all() | ||
if not languages_data: | ||
languages_data = create_objects(movie.languages, LanguageModel, db) | ||
db_movie = MovieModel( | ||
name=movie.name, | ||
date=movie.date, | ||
score=movie.score, | ||
overview=movie.overview, | ||
status=movie.status, | ||
budget=movie.budget, | ||
revenue=movie.revenue, | ||
country=country_data, | ||
genres=genres_data, | ||
actors=actors_data, | ||
languages=languages_data | ||
) | ||
db.add(db_movie) | ||
db.commit() | ||
db.refresh(db_movie) | ||
return db_movie | ||
except SQLAlchemyError: | ||
db.rollback() | ||
raise HTTPException(status_code=400, detail="Invalid input data.") | ||
Comment on lines
+52
to
+54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error handling in the |
||
def delete_movie(db: Session, movie_id: int): | ||
db_movie = db.query(MovieModel).filter(MovieModel.id == movie_id).first() | ||
if not db_movie: | ||
raise HTTPException(status_code=404, detail="Movie with the given ID was not found.") | ||
db.delete(db_movie) | ||
db.commit() | ||
def update_movie(db: Session, movie_id: int, movie: MovieUpdateSchema): | ||
db_movie = db.query(MovieModel).filter(MovieModel.id == movie_id).first() | ||
if not db_movie: | ||
raise HTTPException(status_code=404, detail="Movie with the given ID was not found.") | ||
for key, value in movie.dict(exclude_unset=True).items(): | ||
setattr(db_movie, key, value) | ||
db.commit() | ||
db.refresh(db_movie) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,64 @@ | ||
from fastapi import APIRouter, Depends, HTTPException, Query | ||
from sqlalchemy.exc import IntegrityError | ||
from sqlalchemy.orm import Session, joinedload | ||
|
||
import math | ||
|
||
from fastapi import APIRouter, Depends, HTTPException, Query, Path | ||
from sqlalchemy.orm import Session | ||
from crud.movies import get_movies, create_movie, get_movie, delete_movie, update_movie | ||
from database import get_db | ||
from database.models import MovieModel, CountryModel, GenreModel, ActorModel, LanguageModel | ||
|
||
from schemas.movies import MovieListSchema, MovieCreateSchema, MovieDetailSchema, MovieUpdateSchema, MoviePageSchema | ||
|
||
router = APIRouter() | ||
|
||
|
||
# Write your code here | ||
|
||
@router.get("/movies/", response_model=MoviePageSchema) | ||
async def list_movies( | ||
page: int = Query(1, ge=1), | ||
per_page: int = Query(10, ge=1, le=20), | ||
db: Session = Depends(get_db) | ||
): | ||
offset = (page - 1) * per_page | ||
movies_data = get_movies(db, offset, per_page) | ||
if movies_data: | ||
total_items = db.query(MovieModel).count() | ||
total_pages = math.ceil(total_items / per_page) | ||
prev_page = f"/theater/movies/?page={page - 1}&per_page={per_page}" if page > 1 else None | ||
next_page = f"/theater/movies/?page={page + 1}&per_page={per_page}" if page < total_pages else None | ||
return { | ||
"movies": movies_data, | ||
"prev_page": prev_page, | ||
"next_page": next_page, | ||
"total_pages": total_pages, | ||
"total_items": total_items | ||
} | ||
raise HTTPException(status_code=404, detail="No movies found.") | ||
@router.get("/movies/{movie_id}/", response_model=MovieDetailSchema) | ||
async def get_movie_detail( | ||
movie_id: int = Path(..., gt=0), | ||
db: Session = Depends(get_db) | ||
): | ||
movie = get_movie(db, movie_id) | ||
if movie: | ||
return movie | ||
raise HTTPException(status_code=404, detail="Movie with the given ID was not found.") | ||
@router.post("/movies/", response_model=MovieDetailSchema, status_code=201) | ||
def add_movie(movie: MovieCreateSchema, db: Session = Depends(get_db)): | ||
movies = db.query(MovieModel).filter( | ||
MovieModel.name == movie.name, | ||
MovieModel.date == movie.date | ||
).all() | ||
if not movies: | ||
return create_movie(db, movie) | ||
raise HTTPException( | ||
status_code=409, | ||
detail=f"A movie with the name '{movie.name}' and release date '{movie.date}' already exists." | ||
) | ||
@router.delete("/movies/{movie_id}/", status_code=204) | ||
def remove_movie(movie_id: int, db: Session = Depends(get_db)): | ||
delete_movie(db, movie_id) | ||
@router.patch("/movies/{movie_id}/") | ||
def patch_movie(movie_id: int, movie: MovieUpdateSchema, db: Session = Depends(get_db)): | ||
update_movie(db, movie_id, movie) | ||
return HTTPException(status_code=200, detail="Movie updated successfully.") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,70 @@ | ||
# Write your code here | ||
from datetime import date as date_module, timedelta | ||
from enum import Enum | ||
from typing import Optional, List | ||
from pydantic import BaseModel, Field | ||
class CountrySchema(BaseModel): | ||
id: int | ||
code: str | ||
name: str | None | ||
class GenresSchema(BaseModel): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The use of the pipe operator |
||
id: int | ||
name: str | ||
class ActorsSchema(BaseModel): | ||
id: int | ||
name: str | ||
class LanguagesSchema(BaseModel): | ||
id: int | ||
name: str | ||
class MovieStatus(str, Enum): | ||
released = "Released" | ||
post_production = "Post Production" | ||
in_production = "In Production" | ||
class MovieListSchema(BaseModel): | ||
id: int | ||
name: str | ||
date: date_module | ||
score: float | ||
overview: str | ||
class MoviePageSchema(BaseModel): | ||
movies: List[MovieListSchema] | ||
prev_page: Optional[str] = None | ||
next_page: Optional[str] = None | ||
total_pages: int | ||
total_items: int | ||
class MovieDetailSchema(BaseModel): | ||
id: int | ||
name: str | ||
date: date_module | ||
score: float = Field(ge=0, le=100) | ||
overview: str | ||
status: MovieStatus | ||
budget: float = Field(ge=0) | ||
revenue: float = Field(ge=0) | ||
country: CountrySchema | ||
genres: List[GenresSchema] | ||
actors: List[ActorsSchema] | ||
languages: List[LanguagesSchema] | ||
class MovieCreateSchema(BaseModel): | ||
name: str = Field(..., max_length=255) | ||
date: date_module = Field(..., le=(date_module.today() + timedelta(days=365))) | ||
score: float = Field(ge=0, le=100) | ||
overview: str | ||
status: MovieStatus | ||
budget: float = Field(ge=0) | ||
revenue: float = Field(ge=0) | ||
country: str | ||
genres: List[str] | ||
actors: List[str] | ||
languages: List[str] | ||
class MovieUpdateSchema(BaseModel): | ||
name: Optional[str] = Field(None, max_length=255) | ||
date: Optional[date_module] = Field(None, le=(date_module.today() + timedelta(days=365))) | ||
score: Optional[float] = Field(None, ge=0, le=100) | ||
overview: Optional[str] = None | ||
status: Optional[MovieStatus] = None | ||
budget: Optional[float] = Field(None, ge=0) | ||
revenue: Optional[float] = Field(None, ge=0) | ||
country: Optional[str] = None | ||
genres: Optional[List[str]] = None | ||
actors: Optional[List[str]] = None | ||
languages: Optional[List[str]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
create_objects
function assumes that all elements in thedata
list are unique. If there are duplicates, this could lead to integrity errors when adding to the database. Consider adding a check to handle duplicates or ensure that the input data is pre-validated.