Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

solution #26

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ max-line-length = 119
max-complexity = 18
select = B,C,E,F,W,T4,B9,ANN,Q0,N8,VNE
exclude = .venv
extend-exclude = src/tests/*
extend-exclude = src/tests/*, src/database/migrations/*, src/config/*, src/database/populate.py
169 changes: 166 additions & 3 deletions src/routes/movies.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,175 @@
from typing import Dict

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import desc
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, joinedload

from database import get_db
from database.models import MovieModel, CountryModel, GenreModel, ActorModel, LanguageModel

from database.models import (MovieModel,
CountryModel,
GenreModel,
ActorModel,
LanguageModel)
from schemas.movies import (MoviesBase,
MovieBase,
MovieDetailSchema,
MovieCreateSchema,
CountryBase,
GenreBase,
ActorBase,
LanguageBase,
MovieUpdateSchema)

router = APIRouter()


# Write your code here
# get all movies
@router.get("/movies/", response_model=MoviesBase)
def get_movies(db: Session = Depends(get_db),
page: int = Query(1, ge=1),
per_page: int = Query(10, ge=1)) -> MoviesBase:
movies = db.query(MovieModel).options(
joinedload(MovieModel.country),
joinedload(MovieModel.genres),
joinedload(MovieModel.actors),
joinedload(MovieModel.languages)
).order_by(desc(MovieModel.id)).limit(per_page).offset((page - 1) * per_page).all()

if not movies:
raise HTTPException(status_code=404, detail="No movies found.")

prev_page = f"/theater/movies/?page={page - 1}&per_page={per_page}" if page > 1 else None
next_page = f"/theater/movies/?page={page + 1}&per_page={per_page}" if len(movies) == per_page else None
total_pages = (db.query(MovieModel).count() + per_page - 1) // per_page
total_items = db.query(MovieModel).count()

return MoviesBase(
movies=[
MovieBase(
id=movie.id,
name=movie.name,
date=movie.date,
score=movie.score,
overview=movie.overview
) for movie in movies
],
prev_page=prev_page,
next_page=next_page,
total_items=total_items,
total_pages=total_pages
)


# movie creation
@router.post("/movies/", response_model=MovieDetailSchema, status_code=201)
def create_movie(movie: MovieCreateSchema, db: Session = Depends(get_db)) -> MovieDetailSchema:
movie_found = db.query(MovieModel).filter(MovieModel.name == movie.name, MovieModel.date == movie.date).first()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling potential IntegrityError exceptions that might occur during the database commit operations. This will help in providing more informative error messages to the client.


if movie_found:
raise HTTPException(status_code=409, detail=f"A movie with the name"
f" '{movie.name}' and"
f" release date '{movie.date}' already exists.")

country = db.query(CountryModel).filter(CountryModel.name == movie.country).first()

if not country:
country = CountryModel(name=movie.country, code=f"{movie.country}")
db.add(country)
db.commit()
db.refresh(country)

genres = []
for genre in movie.genres:
genre_found = db.query(GenreModel).filter(GenreModel.name == genre).first()
if not genre_found:
genre_found = GenreModel(name=genre)
db.add(genre_found)
db.commit()
db.refresh(genre_found)
genres.append(genre_found)

actors = []
for actor in movie.actors:
actor_found = db.query(ActorModel).filter(ActorModel.name == actor).first()
if not actor_found:
actor_found = ActorModel(name=actor)
db.add(actor_found)
db.commit()
db.refresh(actor_found)
actors.append(actor_found)

languages = []
for language in movie.languages:
language_found = db.query(LanguageModel).filter(LanguageModel.name == language).first()
if not language_found:
language_found = LanguageModel(name=language)
db.add(language_found)
db.commit()
db.refresh(language_found)
languages.append(language_found)

new_movie = MovieModel(
name=movie.name,
date=movie.date,
score=movie.score,
overview=movie.overview,
status=movie.status,
country_id=country.id,
genres=genres,
actors=actors,
languages=languages,
budget=movie.budget,
revenue=movie.revenue
)

db.add(new_movie)
db.commit()
db.refresh(new_movie)

return new_movie


# get movie by id
@router.get("/movies/{movie_id}/", response_model=MovieDetailSchema)
def get_movie(movie_id: int, db: Session = Depends(get_db)) -> MovieDetailSchema:
movie = db.query(MovieModel).filter(MovieModel.id == movie_id).first()

if not movie:
raise HTTPException(status_code=404, detail="Movie with the given ID was not found.")

return movie


# delete movie
@router.delete("/movies/{movie_id}/", status_code=204)
def delete_movie(movie_id: int, db: Session = Depends(get_db)):
movie = db.get(MovieModel, movie_id)

if not movie:
raise HTTPException(status_code=404, detail="Movie with the given ID was not found.")

db.delete(movie)
db.commit()


# update movie
@router.patch("/movies/{movie_id}/", status_code=200)
def update_movie(movie_id: int, movie: MovieUpdateSchema, db: Session = Depends(get_db)) -> dict[str, str]:
existing_movie = db.query(MovieModel).filter(MovieModel.id == movie_id).first()

if not existing_movie:
raise HTTPException(status_code=404, detail="Movie with the given ID was not found.")

for field, value in movie.model_dump(exclude_unset=True).items():
if value:
setattr(existing_movie, field, value)

try:
db.commit()
db.refresh(existing_movie)
except IntegrityError:
db.rollback()
raise HTTPException(status_code=400, detail="Invalid input data.")

return {"detail": "Movie updated successfully."}
83 changes: 82 additions & 1 deletion src/schemas/movies.py
Original file line number Diff line number Diff line change
@@ -1 +1,82 @@
# Write your code here
import datetime

from pydantic import BaseModel, Field, field_validator

from database.models import MovieStatusEnum


class MovieBase(BaseModel):
id: int
name: str = Field(min_length=1, max_length=255)
date: datetime.date
score: float
overview: str


class MoviesBase(BaseModel):
movies: list[MovieBase]
prev_page: str | None
next_page: str | None
total_pages: int
total_items: int


class GenreBase(BaseModel):
id: int
name: str = Field(min_length=1, max_length=255)


class ActorBase(BaseModel):
id: int
name: str = Field(min_length=1, max_length=255)


class LanguageBase(BaseModel):
id: int
name: str = Field(min_length=1, max_length=255)


class CountryBase(BaseModel):
id: int
name: str | None = None
code: str = Field(max_length=2)


class MovieCreateSchema(BaseModel):
name: str
date: datetime.date
score: float
overview: str
status: str
budget: float
revenue: float
country: str
genres: list[str]
actors: list[str]
languages: list[str]

@field_validator("date")
def validate(cls, value: str | datetime.date):
if value > datetime.date.today() + datetime.timedelta(days=365):
raise ValueError("The date must not be more than one year in the future.")
return value


class MovieDetailSchema(MovieBase):
status: str
budget: float = Field(ge=0)
revenue: float = Field(ge=0)
country: CountryBase
genres: list[GenreBase]
actors: list[ActorBase]
languages: list[LanguageBase]


class MovieUpdateSchema(BaseModel):
name: str | None = Field(max_length=255, default=None)
date: datetime.date = None
score: float = Field(ge=0, le=100, default=None)
overview: str = None
status: MovieStatusEnum = None
budget: float = Field(ge=0, default=None)
revenue: float = Field(ge=0, default=None)