diff --git a/mnamer/__main__.py b/mnamer/__main__.py index 259f711c..5de30c8c 100644 --- a/mnamer/__main__.py +++ b/mnamer/__main__.py @@ -19,6 +19,7 @@ def main(): # pragma: no cover tty.error(e) raise SystemExit(2) from None try: + tty.configure(settings) frontend = Cli(settings) frontend.launch() except SystemExit: diff --git a/mnamer/endpoints.py b/mnamer/endpoints.py index 16b5047d..967c63fb 100644 --- a/mnamer/endpoints.py +++ b/mnamer/endpoints.py @@ -4,6 +4,8 @@ from re import match from time import sleep +import Levenshtein + from mnamer.exceptions import ( MnamerException, MnamerNetworkException, @@ -204,30 +206,16 @@ def tvdb_login(api_key: str | None) -> str: Note: You can register for a free TVDb key at thetvdb.com/?tab=apiregister Online docs: api.thetvdb.com/swagger#!/Authentication/post_login. """ - url = "https://api.thetvdb.com/login" + url = "https://api4.thetvdb.com/v4/login" body = {"apikey": api_key} status, content = request_json(url, body=body, cache=False) if status == 401: raise MnamerException("invalid api key") - elif status != 200 or not content.get("token"): # pragma: no cover + elif ( + status != 200 or not content.get("data") or not content.get("data").get("token") + ): # pragma: no cover raise MnamerNetworkException("TVDb down or unavailable?") - return content["token"] - - -def tvdb_refresh_token(token: str) -> str: - """ - Refreshes JWT token. - - Online docs: api.thetvdb.com/swagger#!/Authentication/get_refresh_token. - """ - url = "https://api.thetvdb.com/refresh_token" - headers = {"Authorization": f"Bearer {token}"} - status, content = request_json(url, headers=headers, cache=False) - if status == 401: - raise MnamerException("invalid token") - elif status != 200 or not content.get("token"): # pragma: no cover - raise MnamerNetworkException("TVDb down or unavailable?") - return content["token"] + return content.get("data").get("token") def tvdb_episodes_id( @@ -242,7 +230,7 @@ def tvdb_episodes_id( Online docs: https://api.thetvdb.com/swagger#!/Episodes. """ Language.ensure_valid_for_tvdb(language) - url = f"https://api.thetvdb.com/episodes/{id_tvdb}" + url = f"https://api4.thetvdb.com/v4/episodes/{id_tvdb}" headers = {"Authorization": f"Bearer {token}"} if language: headers["Accept-Language"] = language.a2 @@ -257,6 +245,20 @@ def tvdb_episodes_id( raise MnamerNetworkException("TVDb down or unavailable?") elif content["data"]["id"] == 0: raise MnamerNotFoundException + + if language: + url_trans = ( + f"https://api4.thetvdb.com/v4/episodes/{id_tvdb}/translations/{language.a3}" + ) + trans_status, trans_content = request_json( + url_trans, headers=headers, cache=cache + ) + + if trans_status == 200 and trans_content.get("data"): + trans_data = trans_content["data"] + for key, value in trans_data.items(): + if value: + content["data"][key] = value return content @@ -273,13 +275,25 @@ def tvdb_series_id( Online docs: api.thetvdb.com/swagger#!/Series/get_series_id. """ Language.ensure_valid_for_tvdb(language) - url = f"https://api.thetvdb.com/series/{id_tvdb}" + url = f"https://api4.thetvdb.com/v4/series/{id_tvdb}" headers = {"Authorization": f"Bearer {token}"} if language: headers["Accept-Language"] = language.a2 status, content = request_json( url, headers=headers, cache=cache is True and language is None ) + if language: + urlTranslated = ( + f"https://api4.thetvdb.com/v4/series/{id_tvdb}/translations/{language.a3}" + ) + statusTranslated, contentTranslated = request_json( + urlTranslated, headers=headers, cache=cache is True and language is None + ) + if statusTranslated == 200 and contentTranslated.get("data"): + for key in contentTranslated.get("data").keys(): + content["data"][key] = contentTranslated["data"][key] + else: + raise MnamerNotFoundException if status == 401: raise MnamerException("invalid token") elif status == 404 or not content.get("data"): @@ -294,7 +308,7 @@ def tvdb_series_id( def tvdb_series_id_episodes( token: str, id_tvdb: str, - page: int = 1, + page: int = 0, language: Language | None = None, cache: bool = True, ) -> dict: @@ -305,7 +319,7 @@ def tvdb_series_id_episodes( Online docs: api.thetvdb.com/swagger#!/Series/get_series_id_episodes. """ Language.ensure_valid_for_tvdb(language) - url = f"https://api.thetvdb.com/series/{id_tvdb}/episodes" + url = f"https://api4.thetvdb.com/v4/series/{id_tvdb}/episodes/default" headers = {"Authorization": f"Bearer {token}"} if language: headers["Accept-Language"] = language.a2 @@ -319,6 +333,19 @@ def tvdb_series_id_episodes( raise MnamerNotFoundException elif status != 200: # pragma: no cover raise MnamerNetworkException("TVDb down or unavailable?") + + if language: + url_trans = f"https://api4.thetvdb.com/v4/series/{id_tvdb}/episodes/default/{language.a3}" + trans_status, trans_content = request_json( + url_trans, headers=headers, cache=cache + ) + + if trans_status == 200 and trans_content.get("data"): + trans_data = trans_content["data"] + for key, value in trans_data.items(): + if value: + content["data"][key] = value + content.get("data", {}).get("episodes", []).sort(key=lambda e: e["id"]) return content @@ -327,35 +354,54 @@ def tvdb_series_id_episodes_query( id_tvdb: str, episode: int | None = None, season: int | None = None, - page: int = 1, + page: int = 0, language: Language | None = None, cache: bool = True, ) -> dict: """ - Allows the user to query against episodes for the given series. + Query episodes for a given series in TVDB v4 by filtering client-side. - Note: Paginated with 100 results per page; omitted imdbId-- when would you - ever need to query against both tvdb and imdb series ids? - Online docs: api.thetvdb.com/swagger#!/Series/get_series_id_episodes_query. + Online docs: https://thetvdb.github.io/v4-api/ """ Language.ensure_valid_for_tvdb(language) - url = f"https://api.thetvdb.com/series/{id_tvdb}/episodes/query" headers = {"Authorization": f"Bearer {token}"} + + url = f"https://api4.thetvdb.com/v4/series/{id_tvdb}/episodes/default" if language: headers["Accept-Language"] = language.a2 - parameters = {"airedSeason": season, "airedEpisode": episode, "page": page} + url = f"{url}/{language.a3}" + current_page = max(page or 0, 0) + matches: list[dict] = [] + + parameters = {"page": current_page} status, content = request_json( - url, - parameters, - headers=headers, - cache=cache is True and language is None, + url, parameters, headers=headers, cache=cache is True and language is None ) if status == 401: raise MnamerException("invalid token") - elif status == 404 or not content.get("data"): + elif status == 404 or not content or not content.get("data"): raise MnamerNotFoundException - elif status != 200: # pragma: no cover + elif status != 200: raise MnamerNetworkException("TVDb down or unavailable?") + items = content.get("data", {}).get("episodes", []) + hasValidAbsoluteEpisode = ( + episode != 0 + and len(items) > 0 + and max([i.get("absoluteNumber", 0) for i in items]) > 0 + ) + if not len(items): + raise MnamerNotFoundException + for item in items: + sn_ok = True if season is None else item.get("seasonNumber") == season + if season is None and episode is not None and hasValidAbsoluteEpisode: + ep_ok = item.get("absoluteNumber") == episode + else: + ep_ok = True if episode is None else item.get("number") == episode + if sn_ok and ep_ok: + matches.append(item) + if "data" in content and "episodes" in content["data"]: + matches.sort(key=lambda e: e["id"]) + content["data"]["episodes"] = matches return content @@ -370,15 +416,29 @@ def tvdb_search_series( """ Allows the user to search for a series based on the following parameters. - Online docs: https://api.thetvdb.com/swagger#!/Search/get_search_series - Note: results a maximum of 100 entries per page, no option for pagination. + Online swagger docs: https://thetvdb.github.io/v4-api/ """ Language.ensure_valid_for_tvdb(language) - url = "https://api.thetvdb.com/search/series" - parameters = {"name": series, "imdbId": id_imdb, "zap2itId": id_zap2it} headers = {"Authorization": f"Bearer {token}"} if language: headers["Accept-Language"] = language.a2 + + provided = [p is not None for p in (series, id_imdb, id_zap2it)] + if sum(provided) != 1: + raise MnamerException( + "series, id_imdb, id_zap2it parameters are mutually exclusive" + ) + + if series is not None: + url = "https://api4.thetvdb.com/v4/search" + parameters = {"query": series, "type": "series"} + if language: + parameters["language"] = language.a3 + else: + remote_id = id_imdb or id_zap2it + url = f"https://api4.thetvdb.com/v4/search/remoteid/{remote_id}" + parameters = None + status, content = request_json( url, parameters, headers=headers, cache=cache is True and language is None ) @@ -388,10 +448,34 @@ def tvdb_search_series( raise MnamerException( "series, id_imdb, id_zap2it parameters are mutually exclusive" ) - elif status == 404 or not content.get("data"): + elif status == 404 or not content or not content.get("data"): raise MnamerNotFoundException elif status != 200: # pragma: no cover raise MnamerNetworkException("TVDb down or unavailable?") + + def get_titles(serie_entry): + """Return all possible title variations (name, aliases, translations).""" + titles = [] + if name := serie_entry.get("name"): + titles.append(name) + titles += serie_entry.get("aliases", []) + titles += list(serie_entry.get("translations", {}).values()) + return [t.lower().strip() for t in titles if t] + + def sort_by_similarity(matched_series, target_name): + if target_name is None: + return + target_name = target_name.lower().strip() + return sorted( + matched_series, + key=lambda s: min( + Levenshtein.distance(title, target_name.lower().strip()) + for title in get_titles(s) + if title + ), + ) + + # content["data"] = sort_by_similarity(content["data"], series) return content diff --git a/mnamer/language.py b/mnamer/language.py index e5df124b..21e33162 100644 --- a/mnamer/language.py +++ b/mnamer/language.py @@ -1,6 +1,8 @@ from __future__ import annotations import dataclasses +import re +from pathlib import PurePath from typing import Any from mnamer.exceptions import MnamerException @@ -32,6 +34,36 @@ ) +def _guess_lang_from_windows_path(filePath: PurePath) -> str | None: + try: + from guessit import guessit + + g = guessit(filePath.name, {"type": "subtitle"}) + lang = g.get("subtitle_language") or g.get("language") + if isinstance(lang, list) and lang: + lang = str(lang[0]) + if isinstance(lang, str) and lang: + return lang.lower() + except Exception: + pass + + def force_guess_directly_from_path(p): + _LANG_BASE = r"[a-z]{2,3}" + _LANG_VARIANT = r"(?:[-_][a-z0-9]{2,4})?" + _BOUNDARY_LEFT = r"(?:^|[.\-_ \[(])" + _BOUNDARY_RIGHT = r"(?=\.srt$)" + _LANG_NEAR_END = re.compile( + _BOUNDARY_LEFT + r"(" + _LANG_BASE + _LANG_VARIANT + r")" + _BOUNDARY_RIGHT, + re.IGNORECASE, + ) + m = _LANG_NEAR_END.search(p.name) + if m: + return m.group(1).lower() + return p.stem[-3:].lower() + + return force_guess_directly_from_path(filePath) + + @dataclasses.dataclass class Language: """dataclass including the name, ISO 639-2, and ISO 639-1 language codes""" @@ -44,6 +76,8 @@ class Language: def parse(cls, value: Any) -> Language | None: if not value: return None + if isinstance(value, PurePath): + return cls.parse(_guess_lang_from_windows_path(value)) if isinstance(value, cls): return value if isinstance(value, dict): @@ -58,7 +92,7 @@ def parse(cls, value: Any) -> Language | None: value = value.lower() for row in KNOWN_LANGUAGES: for item in row: - if value == item: + if value == item or (isinstance(value, str) and value[-2:] == item): return cls(row[0].capitalize(), row[1], row[2]) raise MnamerException("Could not determine language") diff --git a/mnamer/metadata.py b/mnamer/metadata.py index 5a6988a8..46a8ef91 100644 --- a/mnamer/metadata.py +++ b/mnamer/metadata.py @@ -45,6 +45,7 @@ class Metadata: language_sub: Language | None = None quality: str | None = None synopsis: str | None = None + original: str | None = None @classmethod def to_media_type(cls) -> MediaType: @@ -67,7 +68,10 @@ def __setattr__(self, key: str, value: Any): } converter: Callable | None = converter_map.get(key) if value is not None and converter: - value = converter(value) + try: + value = converter(value) + except Exception: + value = None super().__setattr__(key, value) def __format__(self, format_spec: str | None): @@ -176,3 +180,12 @@ def __setattr__(self, key: str, value: Any): if value is not None and converter: value = converter(value) super().__setattr__(key, value) + + def is_invalid_season(self): + return not isinstance(self.season, int) or self.season > 1500 + + @property + def year(self) -> int | None: + if self.is_invalid_season(): + return self.season + return None diff --git a/mnamer/providers.py b/mnamer/providers.py index 3b03703d..c9444401 100644 --- a/mnamer/providers.py +++ b/mnamer/providers.py @@ -28,7 +28,7 @@ from mnamer.metadata import Metadata, MetadataEpisode, MetadataMovie from mnamer.setting_store import SettingStore from mnamer.types import MediaType, ProviderType -from mnamer.utils import parse_date, year_range_parse +from mnamer.utils import parse_date, request_json, year_range_parse class Provider(ABC): @@ -213,7 +213,7 @@ def _search_name(self, name: str, year: str | None, language: Language | None): class Tvdb(Provider): """Queries the TVDb API.""" - api_key: str = environ.get("API_KEY_TVDB", "E69C7A2CEF2F3152") + api_key: str = environ.get("API_KEY_TVDB", "7eef8b26-3af2-4431-9c4e-8547e98efff4") def __init__(self, api_key: str | None = None, cache: bool = True): super().__init__(api_key, cache) @@ -228,15 +228,38 @@ def search(self, query: MetadataEpisode) -> Iterator[MetadataEpisode]: assert query if not self.token: self.token = self._login() - if query.id_tvdb and query.date: + if query.id_tvdb is not None and query.date is not None: results = self._search_tvdb_date(query.id_tvdb, query.date, query.language) - elif query.id_tvdb: + elif query.id_tvdb is not None: results = self._search_id( query.id_tvdb, query.season, query.episode, query.language ) - elif query.series and query.date: + elif query.series is not None and query.date is not None: results = self._search_series_date(query.series, query.date, query.language) - elif query.series: + elif ( + query.series is not None + and query.year is not None + and query.episode is not None + ): + episode = self._get_episode_by_serie_episode_year( + f"{query.series} ({query.year})", + query.year, + query.episode, + query.language, + ) + results = ( + self._search_id( + episode.get("seriesId"), + episode.get("seasonNumber"), + episode.get("number"), + query.language, + ) + if episode + else self._search_series( + query.series, query.season, query.episode, query.language + ) + ) + elif query.series is not None: results = self._search_series( query.series, query.season, query.episode, query.language ) @@ -255,7 +278,7 @@ def _search_id( series_data = tvdb_series_id( self.token, id_tvdb, language=language, cache=self.cache ) - page = 1 + page = 0 while True: episode_data = tvdb_series_id_episodes_query( self.token, @@ -266,30 +289,91 @@ def _search_id( page=page, cache=self.cache, ) - for entry in episode_data["data"]: + for entry in episode_data.get("data", {}).get("episodes", []): try: yield MetadataEpisode( - date=entry["firstAired"], - episode=entry["airedEpisodeNumber"], + date=entry["aired"], + episode=entry["number"], id_tvdb=id_tvdb, - season=entry["airedSeason"], - series=series_data["data"]["seriesName"], + season=entry["seasonNumber"], + series=series_data["data"]["name"], language=language, synopsis=(entry["overview"] or "") .replace("\r\n", "") .replace(" ", "") .strip(), - title=entry["episodeName"].split(";", 1)[0], + title=entry["name"].split(";", 1)[0], ) found = True except (AttributeError, KeyError, ValueError): continue - if page == episode_data["links"]["last"]: + if episode_data["links"]["next"] is None: break page += 1 if not found: raise MnamerNotFoundException + def _get_episode_by_serie_episode_year( + self, + series: str, + year: int, + episode: int | None = None, + language: Language | None = None, + cache: bool = False, + ) -> int | None: + """ + Search a series on TVDB and return the season number aired in the given year. + If `episode` is provided, tries to match that episode number. + """ + + # 1️⃣ Search for the series + url = "https://api4.thetvdb.com/v4/search" + parameters = {"query": series, "type": "series"} + + headers = {"Authorization": f"Bearer {self.token}"} + if language: + headers["Accept-Language"] = language.a2 + + status, content = request_json( + url, parameters, headers=headers, cache=cache is True and language is None + ) + if status != 200 or "data" not in content or not content["data"]: + return None + + series_id = content["data"][0]["tvdb_id"] + + # 2️⃣ Get all episodes of this series + url = f"https://api4.thetvdb.com/v4/series/{series_id}/episodes/default" + status, content = request_json( + url, None, headers=headers, cache=cache is True and language is None + ) + if status != 200 or "data" not in content: + return None + + episodes = content.get("data", {}).get("episodes", []) + + # 3️⃣ Filter by year and optionally episode + matched = [ + ep + for ep in episodes + if ep.get("aired", "").startswith(str(year)) + and (episode is None or ep.get("number") == episode) + ] + + if not matched: + matched = [ + ep + for ep in episodes + if ep.get("aired", "").startswith(str(year)) + and (episode is None or ep.get("absoluteNumber") == episode) + ] + + if not matched: + return None + + # 4️⃣ Return the first matching season number + return matched[0] + def _search_series( self, series: str, @@ -302,10 +386,10 @@ def _search_series( self.token, series, language=language, cache=self.cache ) - for series_id in [entry["id"] for entry in series_data["data"][:5]]: + for series_id in [entry["tvdb_id"] for entry in series_data["data"][:5]]: try: for data in self._search_id(series_id, season, episode, language): - if not data.series or not data.season: + if data.series is None or data.season is None: continue found = True yield data @@ -333,7 +417,7 @@ def _search_series_date( series_data = tvdb_search_series( self.token, series, language=language, cache=self.cache ) - tvdb_ids = [entry["id"] for entry in series_data["data"]][:5] + tvdb_ids = [entry["tvdb_id"] for entry in series_data["data"]][:5] found = False for tvdb_id in tvdb_ids: try: diff --git a/mnamer/setting_store.py b/mnamer/setting_store.py index 7b2e60b1..b6dba27d 100644 --- a/mnamer/setting_store.py +++ b/mnamer/setting_store.py @@ -10,7 +10,7 @@ from mnamer.language import Language from mnamer.metadata import Metadata from mnamer.setting_spec import SettingSpec -from mnamer.types import MediaType, ProviderType, SettingType +from mnamer.types import MediaType, ProviderType, RelocateType, SettingType from mnamer.utils import crawl_out, json_loads, normalize_containers @@ -218,6 +218,16 @@ class SettingStore: help="--episode-format: set episode renaming format specification", ).as_dict(), ) + relocation_strategy: str = dataclasses.field( + default=RelocateType.MOVE.value, + metadata=SettingSpec( + dest="relocation_strategy", + choices=[ix.value for ix in RelocateType], + flags=["--relocation-operation"], + group=SettingType.PARAMETER, + help=f"--relocation-operation={'|'.join([ix.value for ix in RelocateType])}: when given, link, copy or move files. Default move. (PS1: Symlink doesnt works on windows) (PS2: Hardlinks can only be created between folders on the same drive.)", + )(), + ) # directive attributes ----------------------------------------------------- diff --git a/mnamer/target.py b/mnamer/target.py index 1f601a9c..635d2b0b 100644 --- a/mnamer/target.py +++ b/mnamer/target.py @@ -3,17 +3,19 @@ import datetime as dt from os import path from pathlib import Path -from shutil import move from typing import Any, ClassVar +from charset_normalizer import from_path from guessit import guessit # type: ignore +from langdetect import detect +from mnamer import tty from mnamer.exceptions import MnamerException from mnamer.language import Language from mnamer.metadata import Metadata, MetadataEpisode, MetadataMovie from mnamer.providers import Provider from mnamer.setting_store import SettingStore -from mnamer.types import MediaType, ProviderType +from mnamer.types import MediaType, ProviderType, RelocateType from mnamer.utils import ( crawl_in, filename_replace, @@ -50,6 +52,13 @@ def __init__(self, file_path: Path, settings: SettingStore | None = None): self._replace_before() self._override_metadata_ids() self._register_provider() + tty.msg( + "Parsed filename: " + + str(file_path) + + " as:\n" + + str(self.metadata.as_dict()), + debug=True, + ) def __str__(self) -> str: if isinstance(self.source, Path): @@ -115,27 +124,7 @@ def destination(self) -> Path: return Path(directory, filename) def _parse(self, file_path: Path): - path_data: dict[str, Any] = {"language": self._settings.language} - if is_subtitle(self.source): - try: - path_data["language"] = Language.parse(self.source.stem[-2:]) - file_path = Path(self.source.parent, self.source.stem[:-2]) - except MnamerException: - pass - options = {"type": self._settings.media, "language": path_data["language"]} - raw_data = dict(guessit(str(file_path), options)) - if isinstance(raw_data.get("season"), list): - raw_data = dict(guessit(str(file_path.parts[-1]), options)) - for k, v in raw_data.items(): - if hasattr(v, "alpha3"): - try: - path_data[k] = Language.parse(v) - except MnamerException: - continue - elif isinstance(v, int | str | dt.date): - path_data[k] = v - elif isinstance(v, list) and all(isinstance(_, int | str) for _ in v): - path_data[k] = v[0] + path_data = self._path_metadata(file_path) if self._settings.media: media_type = self._settings.media elif path_data.get("type"): @@ -148,6 +137,7 @@ def _parse(self, file_path: Path): None: Metadata, }[media_type] self.metadata = meta_cls() + self.metadata.original = self.source.name self.metadata.quality = ( " ".join( path_data[key] @@ -166,16 +156,8 @@ def _parse(self, file_path: Path): ) self.metadata.language = path_data.get("language") self.metadata.group = path_data.get("release_group") - self.metadata.container = file_path.suffix or None - if not self.metadata.language: - try: - self.metadata.language = path_data.get("language") - except MnamerException: - pass - try: - self.metadata.language_sub = path_data.get("subtitle_language") - except MnamerException: - pass + self.metadata.container = path_data.get("container") + self.metadata.language_sub = path_data.get("subtitle_language") if isinstance(self.metadata, MetadataMovie): self.metadata.name = path_data.get("title") self.metadata.year = path_data.get("year") @@ -188,9 +170,61 @@ def _parse(self, file_path: Path): if alternative_title: self.metadata.series = f"{self.metadata.series} {alternative_title}" # adding year to title can reduce false positives - # year = path_data.get("year") - # if year: - # self.metadata.series = f"{self.metadata.series} {year}" + # if path_data.get("year"): + # self.metadata.series = f"{self.metadata.series} ({path_data.get("year")})" + + @staticmethod + def _detect_subtitle_language(file_path: str): + # Detect and decode using best guess for encoding + try: + result = from_path(file_path).best() + if not result: + return None + + text = str(result) + return Language.parse(detect(text)) + except Exception: + return None + + def _path_metadata(self, file_path): + path_data: dict[str, Any] = { + "language": self._settings.language, + "container": file_path.suffix or None, + "type": self._settings.media, + } + raw_data = dict( + guessit( + str_replace(str(file_path), self._settings.replace_before), path_data + ) + ) + if isinstance(raw_data.get("season"), list): + raw_data = dict(guessit(str(file_path.parts[-1]), path_data)) + for k, v in raw_data.items(): + if hasattr(v, "alpha3"): + try: + path_data[k] = Language.parse(v) + except MnamerException: + continue + elif isinstance(v, int | str | dt.date): + path_data[k] = v + elif isinstance(v, list) and all(isinstance(_, int | str) for _ in v): + path_data[k] = v[0] + if is_subtitle(self.source): + try: + path_data["subtitle_language"] = self._detect_subtitle_language( + str(file_path) + ) or Language.parse( + raw_data.get("subtitle_language") or self.source.stem[-3:] + ) + path_data["language"] = self._settings.language + except MnamerException: + pass + try: + path_data["language"] = Language.parse(path_data["language"]) + Language.ensure_valid_for_tvdb(path_data["language"]) + except MnamerException: + path_data["language"] = self._settings.language or Language.parse("eng") + return path_data def _override_metadata_ids(self): id_types = {"imdb", "tmdb", "tvdb", "tvmaze"} @@ -219,8 +253,9 @@ def _replace_before(self) -> None: continue if attr.startswith("_"): continue - value = str_replace(value, self._settings.replace_before) - setattr(self.metadata, attr, value) + setattr( + self.metadata, attr, str_replace(value, self._settings.replace_before) + ) def query(self) -> list[Metadata]: """Queries the target's respective media provider for metadata.""" @@ -240,9 +275,25 @@ def query(self) -> list[Metadata]: def relocate(self) -> None: """Performs the action of renaming and/or moving a file.""" + + def get_method(relocation_strategy: RelocateType): + from os import link, symlink + from shutil import copy, copy2, move + + strategies = { + RelocateType.MOVE: move, + RelocateType.HARDLINK: link, + RelocateType.SYMBOLICLINK: symlink, + RelocateType.COPY: copy, + RelocateType.COPY2: copy2, + } + return strategies[RelocateType(relocation_strategy)] + destination_path = Path(self.destination).resolve() destination_path.parent.mkdir(parents=True, exist_ok=True) try: - move(str(self.source), destination_path) + get_method(self._settings.relocation_strategy)( + str(self.source), destination_path + ) except OSError as e: # pragma: no cover raise MnamerException from e diff --git a/mnamer/types.py b/mnamer/types.py index 6f10d21f..b9771e0e 100644 --- a/mnamer/types.py +++ b/mnamer/types.py @@ -29,6 +29,14 @@ class ProviderType(Enum): OMDB = "omdb" +class RelocateType(Enum): + MOVE = "move" + HARDLINK = "hardlink" + SYMBOLICLINK = "symlink" + COPY = "copy" + COPY2 = "copy-with-metadata" + + class SettingType(Enum): DIRECTIVE = "directive" PARAMETER = "parameter" diff --git a/tests/__init__.py b/tests/__init__.py index f340d06d..7c9a7ade 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -46,7 +46,7 @@ "date": dt.date(2015, 2, 22), "episode": 11, "id_imdb": "tt1520211", - "id_tvdb": 153021, + "id_tvdb": "153021", "id_tvmaze": 73, "media": "television", "season": 5, @@ -57,7 +57,7 @@ "date": dt.date(1999, 11, 8), "episode": 13, "id_imdb": "tt0208616", - "id_tvdb": 78342, + "id_tvdb": "78342", "id_tvmaze": 30436, "media": "television", "season": 1, @@ -68,7 +68,7 @@ "date": dt.date(2015, 10, 19), "episode": 2, "id_imdb": "tt2802850", - "id_tvdb": 269613, + "id_tvdb": "269613", "id_tvmaze": 32, "media": "television", "season": 2, diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 5cead780..f4b7beb6 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -51,7 +51,7 @@ def fn(*args): code = e.code out += strip_format(capsys.readouterr().out.strip()) out += strip_format(capsys.readouterr().err.strip()) - with open(E2E_LOG, "a+") as fp: + with open(E2E_LOG, "a+", encoding="utf-8") as fp: fp.write("=" * 10 + "\n") fp.write(request.node.name + "\n") fp.write("-" * 10 + "\n") diff --git a/tests/e2e/test_directives.py b/tests/e2e/test_directives.py index 391d23e4..6d374e51 100644 --- a/tests/e2e/test_directives.py +++ b/tests/e2e/test_directives.py @@ -84,3 +84,26 @@ def test_test(e2e_run): result = e2e_run("--batch", "--test", ".") assert result.code == 0 assert "testing mode" in result.out + + +@pytest.mark.usefixtures("setup_test_dir") +def test_series_jp_absolute_numbering(e2e_run, setup_test_files): + setup_test_files( + "Attack on Titan - Episode 80 - 1080p BDRip x265 FLAC 2.0 Kira [SEV].mkv" + ) + result = e2e_run("--batch", "--lower", "--episode-api=tvdb", ".") + assert result.code == 0 + assert ( + "\u9032\u6483\u306e\u5de8\u4eba - s04e21 - \u4e8c\u5343\u5e74\u524d\u306e\u541b\u304b\u3089.mkv" + in result.out + ) # Attack on titan in jp escaped name + + +@pytest.mark.usefixtures("setup_test_dir") +def test_series_absolute_numbering(e2e_run, setup_test_files): + setup_test_files( + "Attack on Titan - Episode 80 - 1080p BDRip x265 FLAC 2.0 Kira [SEV].mkv" + ) + result = e2e_run("--batch", "--lower", "--episode-api=tvdb", "--language=eng", ".") + assert result.code == 0 + assert "attack on titan - s04e21 - from you, 2,000 years ago.mkv" in result.out diff --git a/tests/e2e/test_moving.py b/tests/e2e/test_relocation.py similarity index 57% rename from tests/e2e/test_moving.py rename to tests/e2e/test_relocation.py index 7b8ccfb6..65d23eaa 100644 --- a/tests/e2e/test_moving.py +++ b/tests/e2e/test_relocation.py @@ -1,3 +1,4 @@ +import platform from pathlib import Path import pytest @@ -10,6 +11,21 @@ ] +def files_in_cwd(): + return [f for f in Path.cwd().iterdir() if f.is_file()] + + +@pytest.mark.usefixtures("setup_test_dir") +def test_matching_different_language(e2e_run, setup_test_files): + setup_test_files( + "Quien a hierro mata [MicroHD 1080p][DTS 5.1-Castellano-AC3 5.1-Castellano+Subs][ES-EN].mkv" + ) + result = e2e_run("--batch", "--language=spa", "--media=movie", ".") + assert result.code == 0 + assert "Quien a Hierro Mata (2019).mkv" in result.out + assert "1 out of 1 files processed successfully" in result.out + + @pytest.mark.usefixtures("setup_test_dir") def test_absolute_path(e2e_run, setup_test_files): setup_test_files("kill.bill.vol.1.mkv") @@ -158,7 +174,6 @@ def test_format_id(e2e_run, setup_test_files): @pytest.mark.tvdb -@pytest.mark.xfail(strict=False) @pytest.mark.usefixtures("setup_test_dir") def test_format_id__tvdb(e2e_run, setup_test_files): setup_test_files("archer.2009.s10e07.webrip.x264-lucidtv.mp4") @@ -173,7 +188,6 @@ def test_format_id__tvdb(e2e_run, setup_test_files): @pytest.mark.tvmaze -@pytest.mark.xfail(strict=False) @pytest.mark.usefixtures("setup_test_dir") def test_format_season0(e2e_run, setup_test_files): setup_test_files("south.park.s00e01.mp4") @@ -211,3 +225,103 @@ def test_ambiguous_language_deletction(e2e_run, setup_test_files): ) result = e2e_run("--batch", ".") assert result.code == 0 + + +@pytest.mark.usefixtures("setup_test_dir") +def test_relocation_operation_copy(e2e_run, setup_test_files): + setup_test_files("aladdin.2019.avi") + files_before = files_in_cwd() + result = e2e_run("--relocation-operation=copy", "--batch", "--lower", ".") + assert result.code == 0 + assert "aladdin (2019).avi" in result.out + files_after = [f for f in files_in_cwd() if f not in files_before] + assert len(files_after) == len(files_before) and len(files_after) == 1 + assert files_before[0].exists() and files_after[0].exists() + + +@pytest.mark.usefixtures("setup_test_dir") +def test_relocation_operation_copy2(e2e_run, setup_test_files): + setup_test_files("aladdin.2019.avi") + files_before = files_in_cwd() + result = e2e_run( + "--relocation-operation=copy-with-metadata", "--batch", "--lower", "." + ) + assert result.code == 0 + assert "aladdin (2019).avi" in result.out + files_after = [f for f in files_in_cwd() if f not in files_before] + assert len(files_after) == len(files_before) and len(files_after) == 1 + assert files_before[0].stat().st_size == files_after[0].stat().st_size + assert files_before[0].stat().st_mode == files_after[0].stat().st_mode + assert int(files_before[0].stat().st_mtime) == int(files_after[0].stat().st_mtime) + assert files_before[0].stat().st_uid == files_after[0].stat().st_uid + assert files_before[0].stat().st_gid == files_after[0].stat().st_gid + + +@pytest.mark.usefixtures("setup_test_dir") +def test_relocation_operation_symlink(e2e_run, setup_test_files): + if platform.system() == "Windows": + return + setup_test_files("aladdin.2019.avi") + files_before = files_in_cwd() + result = e2e_run("--relocation-operation=symlink", "--batch", "--lower", ".") + assert result.code == 0 + assert "aladdin (2019).avi" in result.out + files_after = [f for f in files_in_cwd() if f not in files_before] + assert len(files_after) == len(files_before) and len(files_after) == 1 + assert files_after[0].is_symlink() and not files_before[0].is_symlink() + assert files_after[0].resolve() == files_before[0].resolve() + txt_example = "Test content for hardlink" + files_before[0].write_text(txt_example, encoding="utf-8") + assert files_after[0].read_text(encoding="utf-8") == txt_example + files_before[0].unlink() + assert not files_before[0].exists() + assert files_before[0].read_text(encoding="utf-8") != txt_example + + +@pytest.mark.usefixtures("setup_test_dir") +def test_relocation_operation_hardlink(e2e_run, setup_test_files): + setup_test_files("aladdin.2019.avi") + files_before = files_in_cwd() + result = e2e_run("--relocation-operation=hardlink", "--batch", "--lower", ".") + assert result.code == 0 + assert "aladdin (2019).avi" in result.out + files_after = [f for f in files_in_cwd() if f not in files_before] + assert len(files_after) == len(files_before) and len(files_after) == 1 + txt_example = "Test content for hardlink" + files_before[0].write_text(txt_example, encoding="utf-8") + assert files_after[0].read_text(encoding="utf-8") == txt_example + assert (files_before[0].stat().st_ino == files_after[0].stat().st_ino) and ( + files_before[0].stat().st_dev == files_after[0].stat().st_dev + ) + assert files_before[0].stat().st_nlink == 2 and files_after[0].stat().st_nlink == 2 + files_before[0].unlink() + assert ( + files_after[0].stat().st_nlink == 1 + and files_after[0].exists() + and not files_before[0].exists() + ) + assert files_after[0].read_text(encoding="utf-8") == txt_example + + +@pytest.mark.usefixtures("setup_test_dir") +def test_relocation_operation_move(e2e_run, setup_test_files): + setup_test_files("aladdin.2019.avi") + files_before = files_in_cwd() + result = e2e_run("--relocation-operation=move", "--batch", "--lower", ".") + assert result.code == 0 + assert "aladdin (2019).avi" in result.out + files_after = [f for f in files_in_cwd() if f not in files_before] + assert len(files_after) == len(files_before) and len(files_after) == 1 + assert not files_before[0].exists() and files_after[0].exists() + + +@pytest.mark.usefixtures("setup_test_dir") +def test_original_filename(e2e_run, setup_test_files): + setup_test_files("archer.2009.s10e07.webrip.x264-lucidtv.mp4") + result = e2e_run( + "--batch", + "--episode-format='{original}'", + ".", + ) + assert result.code == 0 + assert "archer.2009.s10e07.webrip.x264-lucidtv.mp4" in result.out diff --git a/tests/network/test_endpoints__tvdb.py b/tests/network/test_endpoints__tvdb.py index 87d5c905..83f3b7dc 100644 --- a/tests/network/test_endpoints__tvdb.py +++ b/tests/network/test_endpoints__tvdb.py @@ -3,7 +3,6 @@ from mnamer.endpoints import ( tvdb_episodes_id, tvdb_login, - tvdb_refresh_token, tvdb_search_series, tvdb_series_id, tvdb_series_id_episodes, @@ -22,42 +21,25 @@ EXPECTED_TOP_LEVEL_SHOW_KEYS = { "absoluteNumber", - "airedEpisodeNumber", - "airedSeason", - "airedSeasonID", - "airsAfterSeason", - "airsBeforeEpisode", - "airsBeforeSeason", - "contentRating", - "directors", - "dvdChapter", - "dvdDiscid", - "dvdEpisodeNumber", - "dvdSeason", - "episodeName", - "filename", - "firstAired", - "guestStars", + "aired", + "finaleType", "id", - "imdbId", + "image", + "imageType", "isMovie", - "language", "lastUpdated", - "lastUpdatedBy", + "name", + "nameTranslations", + "number", "overview", - "productionCode", + "overviewTranslations", + "runtime", + "seasonNumber", + "seasons", "seriesId", - "showUrl", - "siteRating", - "siteRatingCount", - "thumbAdded", - "thumbAuthor", - "thumbHeight", - "thumbWidth", - "writers", + "year", } - LOST_TVDB_ID_EPISODE = "127131" LOST_TVDB_ID_SERIES = "73739" THE_WITCHER_ID_SERIES = "362696" @@ -82,16 +64,6 @@ def test_tvdb_login__login_fail(): tvdb_login(JUNK_TEXT) -def test_tvdb_refresh_token__refresh_success(): - token = tvdb_login(Tvdb.api_key) - assert tvdb_refresh_token(token) is not None - - -def test_tvdb_refresh_token__refresh_fail(): - with pytest.raises(MnamerException): - tvdb_refresh_token(JUNK_TEXT) - - @pytest.mark.xfail(strict=False) def test_tvdb_episodes_id__invalid_token(): with pytest.raises(MnamerException): @@ -129,7 +101,7 @@ def test_tvdb_episodes_id__success(tvdb_token): def test_tvdb_episodes_id__language(tvdb_token): result = tvdb_episodes_id(tvdb_token, LOST_TVDB_ID_EPISODE, RUSSIAN_LANG) - assert result["data"]["episodeName"] == "Пилот (1)" + assert result["data"]["name"] == "Пилот (1)" def test_tvdb_episodes_id__language__invalid(tvdb_token): @@ -166,33 +138,27 @@ def test_tvdb_series_id__no_hits(tvdb_token): def test_tvdb_series_id__success(tvdb_token): expected_top_level_keys = { - "added", - "addedBy", - "airsDayOfWeek", - "airsTime", "aliases", - "banner", - "fanart", + "averageRuntime", + "defaultSeasonType", + "episodes", "firstAired", - "genre", "id", - "imdbId", - "language", + "image", + "isOrderRandomized", + "lastAired", "lastUpdated", - "network", - "networkId", + "name", + "nameTranslations", + "nextAired", + "originalCountry", + "originalLanguage", "overview", - "poster", - "rating", - "runtime", - "season", - "seriesId", - "seriesName", - "siteRating", - "siteRatingCount", + "overviewTranslations", + "score", "slug", "status", - "zap2itId", + "year", } result = tvdb_series_id(tvdb_token, LOST_TVDB_ID_SERIES) @@ -200,12 +166,12 @@ def test_tvdb_series_id__success(tvdb_token): assert "data" in result assert set(result["data"].keys()) == expected_top_level_keys assert str(result["data"]["id"]) == LOST_TVDB_ID_SERIES - assert result["data"]["seriesName"] == "Lost" + assert result["data"]["name"] == "Lost" def test_tvdb_series_id__language(tvdb_token): result = tvdb_series_id(tvdb_token, THE_WITCHER_ID_SERIES, RUSSIAN_LANG) - assert result["data"]["seriesName"] == "Ведьмак" + assert result["data"]["name"] == "Ведьмак" @pytest.mark.xfail(strict=False) @@ -238,7 +204,7 @@ def test_tvdb_series_id_episodes__success(tvdb_token): result = tvdb_series_id_episodes(tvdb_token, LOST_TVDB_ID_SERIES) assert isinstance(result, dict) assert "data" in result - entry = result["data"][0] + entry = result["data"]["episodes"][0] assert set(entry.keys()) == EXPECTED_TOP_LEVEL_SHOW_KEYS assert str(entry["id"]) == LOST_TVDB_ID_EPISODE @@ -247,7 +213,7 @@ def test_tvdb_series_id_episodes__language(tvdb_token): result = tvdb_series_id_episodes( tvdb_token, THE_WITCHER_ID_SERIES, language=RUSSIAN_LANG ) - assert result["data"][0]["episodeName"] == "Начало конца" + assert result["data"]["episodes"][0]["name"] == "Начало конца" @pytest.mark.xfail(strict=False) @@ -272,24 +238,24 @@ def test_tvdb_series_id_episodes_query__invalid_id_tvdb(tvdb_token): def test_tvdb_series_id_episodes_query__page_valid(tvdb_token): - tvdb_series_id_episodes_query(tvdb_token, LOST_TVDB_ID_SERIES, page=1) - tvdb_series_id_episodes_query(tvdb_token, LOST_TVDB_ID_SERIES, page=1, season=1) + tvdb_series_id_episodes_query(tvdb_token, LOST_TVDB_ID_SERIES, page=0) + tvdb_series_id_episodes_query(tvdb_token, LOST_TVDB_ID_SERIES, page=0, season=1) tvdb_series_id_episodes_query( - tvdb_token, LOST_TVDB_ID_SERIES, page=1, season=1, episode=1 + tvdb_token, LOST_TVDB_ID_SERIES, page=0, season=1, episode=1 ) with pytest.raises(MnamerNotFoundException): tvdb_series_id_episodes_query( - tvdb_token, LOST_TVDB_ID_SERIES, page=11, cache=False + tvdb_token, LOST_TVDB_ID_SERIES, page=10, cache=False ) with pytest.raises(MnamerNotFoundException): tvdb_series_id_episodes_query( - tvdb_token, LOST_TVDB_ID_SERIES, page=2, season=1, cache=False + tvdb_token, LOST_TVDB_ID_SERIES, page=1, season=0, cache=False ) with pytest.raises(MnamerNotFoundException): tvdb_series_id_episodes_query( tvdb_token, LOST_TVDB_ID_SERIES, - page=2, + page=1, season=1, episode=1, cache=False, @@ -300,10 +266,9 @@ def test_tvdb_series_id_episodes_query__success_id_tvdb(tvdb_token): result = tvdb_series_id_episodes_query(tvdb_token, LOST_TVDB_ID_SERIES) assert isinstance(result, dict) assert "data" in result - data = result["data"] - assert len(data) == 100 - actual_top_level_keys = set(data[0].keys()) - assert actual_top_level_keys == EXPECTED_TOP_LEVEL_SHOW_KEYS + data = result["data"]["episodes"] + assert len(data) == result["links"]["total_items"] and len(data) >= 100 + assert set(data[0].keys()) >= EXPECTED_TOP_LEVEL_SHOW_KEYS assert str(data[0]["id"]) == LOST_TVDB_ID_EPISODE @@ -311,9 +276,8 @@ def test_tvdb_series_id_episodes_query__success_id_tvdb_season(tvdb_token): result = tvdb_series_id_episodes_query(tvdb_token, LOST_TVDB_ID_SERIES, season=1) assert isinstance(result, dict) assert "data" in result - data = result["data"] - actual_top_level_keys = set(data[0].keys()) - assert actual_top_level_keys == EXPECTED_TOP_LEVEL_SHOW_KEYS + data = result["data"]["episodes"] + assert set(data[0].keys()) >= EXPECTED_TOP_LEVEL_SHOW_KEYS assert str(data[0]["id"]) == LOST_TVDB_ID_EPISODE assert result["links"]["prev"] is None assert result["links"]["next"] is None @@ -327,9 +291,8 @@ def test_tvdb_series_id_episodes_query__success_id_tvdb_season_episode( ) assert isinstance(result, dict) assert "data" in result - data = result["data"] - actual_top_level_keys = set(data[0].keys()) - assert actual_top_level_keys == EXPECTED_TOP_LEVEL_SHOW_KEYS + data = result["data"]["episodes"] + assert set(data[0].keys()) >= EXPECTED_TOP_LEVEL_SHOW_KEYS assert str(data[0]["id"]) == LOST_TVDB_ID_EPISODE assert result["links"]["prev"] is None assert result["links"]["next"] is None @@ -343,7 +306,7 @@ def test_tvdb_series_id_episodes_query(tvdb_token): episode=1, language=RUSSIAN_LANG, ) - assert result["data"][0]["episodeName"] == "Начало конца" + assert result["data"]["episodes"][0]["name"] == "Начало конца" def test_tvdb_search_series__invalid_token(): @@ -369,14 +332,13 @@ def test_tvdb_search_series__invalid_id_imdb(tvdb_token): def test_tvdb_search_series__success(tvdb_token): expected_top_level_keys = { "aliases", - "banner", - "firstAired", + "first_air_time", "id", - "image", + "image_url", "network", "overview", - "poster", - "seriesName", + "thumbnail", + "name", "slug", "status", } @@ -384,11 +346,10 @@ def test_tvdb_search_series__success(tvdb_token): assert isinstance(result, dict) assert "data" in result data = result["data"] - assert len(data) == 100 - actual_top_level_keys = set(data[0].keys()) - assert actual_top_level_keys == expected_top_level_keys + assert len(data) == result["links"]["page_size"] + assert set(data[0].keys()) >= expected_top_level_keys def test_tvdb_search_series__language(tvdb_token): results = tvdb_search_series(tvdb_token, "Witcher", language=RUSSIAN_LANG) - assert any(result["seriesName"] for result in results["data"]) + assert any(result["name"] for result in results["data"]) diff --git a/tests/network/test_providers__tvdb.py b/tests/network/test_providers__tvdb.py index 14f1fb8c..e9f9221d 100644 --- a/tests/network/test_providers__tvdb.py +++ b/tests/network/test_providers__tvdb.py @@ -80,7 +80,7 @@ def test_tvdb_provider__search__series(meta: dict, provider: Tvdb): def test_tvdb_provider__search__series_deep(provider: Tvdb): query = MetadataEpisode(series="House Rules (au)", season=6, episode=6) results = provider.search(query) - assert any(result.id_tvdb == 269795 for result in results) + assert any(result.id_tvdb == "269795" for result in results) @pytest.mark.parametrize("meta", EPISODE_META.values(), ids=list(EPISODE_META))