Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ PARAMETERS:
-r, --recurse: search for files within nested directories
-s, --scene: use dots in place of alphanumeric chars
-v, --verbose: increase output verbosity
-w, --watch: watch target(s) for changes
--hits=<NUMBER>: limit the maximum number of hits for each query
--ignore=<PATTERN,...>: ignore files matching these regular expressions
--language=<LANG>: specify the search language
Expand Down
156 changes: 99 additions & 57 deletions mnamer/frontends.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import time
from abc import ABC, abstractmethod
from pathlib import Path

from watchdog.events import FileClosedEvent, FileSystemEventHandler
from watchdog.observers import Observer

from mnamer import tty
from mnamer.const import SYSTEM, USAGE, VERSION
Expand Down Expand Up @@ -63,7 +68,7 @@ def launch(self):
pass


class Cli(Frontend):
class Cli(Frontend, FileSystemEventHandler):
def __init__(self, settings: SettingStore):
super().__init__(settings)
if not settings.targets:
Expand All @@ -77,9 +82,43 @@ def total_count(self):

def launch(self) -> None:
tty.msg("Starting mnamer", MessageType.HEADING)
self._ensure_targets()
self._process_targets()
self._report_results()
if self.settings.watch:
self._watch_targets()
else:
self._ensure_targets()
self._process_targets()
self._report_results()

def _watch_targets(self) -> None:
observer = Observer()
for target in self.settings.targets:
tty.msg(f"watching {str(target.absolute())}", debug=True)
observer.schedule(
self, str(target.absolute()), recursive=self.settings.recurse
)

observer.start()
try:
self._process_targets()

while True:
time.sleep(5)
except:
observer.stop()
observer.join()

def on_closed(self, event: FileClosedEvent):
if event.is_directory:
return

target = Target.filter_file_path(self.settings, Path(event.src_path))
if target is None:
return

try:
self._process_target(target)
except FileNotFoundError:
tty.msg(f"file not found {str(target.source)}", MessageType.ALERT)

def _ensure_targets(self) -> None:
if not self.targets:
Expand All @@ -89,66 +128,69 @@ def _ensure_targets(self) -> None:

def _process_targets(self) -> None:
for target in self.targets:
self._announce_file(target)
self._list_details(target)
cancel = self._process_target(target)
if cancel:
break

# find match for target
matches = []
try:
matches = target.query()
except MnamerNotFoundException:
tty.msg("no matches found", MessageType.ALERT)
except MnamerNetworkException:
tty.msg("network error", MessageType.ALERT)
if not matches and self.settings.no_guess:
tty.msg("skipping (--no-guess)", MessageType.ALERT)
continue
def _process_target(self, target: Target) -> bool:
self._announce_file(target)
self._list_details(target)

# find match for target
matches = []
try:
matches = target.query()
except MnamerNotFoundException:
tty.msg("no matches found", MessageType.ALERT)
except MnamerNetworkException:
tty.msg("network error", MessageType.ALERT)
if not matches and self.settings.no_guess:
tty.msg("skipping (--no-guess)", MessageType.ALERT)
return False
try:
if self.settings.batch:
match = matches[0] if matches else target.metadata
elif not matches:
match = tty.metadata_guess(target.metadata)
else:
match = tty.metadata_prompt(matches)
except MnamerSkipException:
tty.msg("skipping (user request)", MessageType.ALERT)
return False
except MnamerAbortException:
tty.msg("aborting (user request)", MessageType.ERROR)
return True
target.metadata.update(match)

if is_subtitle(target.metadata.container) and not target.metadata.language_sub:
if self.settings.batch:
tty.msg(
"skipping (subtitle language can't be detected)",
MessageType.ALERT,
)
return False
try:
if self.settings.batch:
match = matches[0] if matches else target.metadata
elif not matches:
match = tty.metadata_guess(target.metadata)
else:
match = tty.metadata_prompt(matches)
target.metadata.language_sub = tty.subtitle_prompt()
except MnamerSkipException:
tty.msg("skipping (user request)", MessageType.ALERT)
continue
return False
except MnamerAbortException:
tty.msg("aborting (user request)", MessageType.ERROR)
break
target.metadata.update(match)

if (
is_subtitle(target.metadata.container)
and not target.metadata.language_sub
):
if self.settings.batch:
tty.msg(
"skipping (subtitle language can't be detected)",
MessageType.ALERT,
)
continue
try:
target.metadata.language_sub = tty.subtitle_prompt()
except MnamerSkipException:
tty.msg("skipping (user request)", MessageType.ALERT)
continue
except MnamerAbortException:
tty.msg("aborting (user request)", MessageType.ERROR)
break

# sanity check move
if target.destination == target.source:
tty.msg(
"skipping (source and destination paths are the same)",
MessageType.ALERT,
)
continue
if self.settings.no_overwrite and target.destination.exists():
tty.msg("skipping (--no-overwrite)", MessageType.ALERT)
continue
return True

# sanity check move
if target.destination == target.source:
tty.msg(
"skipping (source and destination paths are the same)",
MessageType.ALERT,
)
return False
if self.settings.no_overwrite and target.destination.exists():
tty.msg("skipping (--no-overwrite)", MessageType.ALERT)
return False

self._rename_and_move_file(target)
self._rename_and_move_file(target)
return False

def _announce_file(self, target: Target):
media_type = target.metadata.to_media_type().value.title()
Expand Down
9 changes: 9 additions & 0 deletions mnamer/setting_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ class SettingStore:
help="-v, --verbose: increase output verbosity",
).as_dict(),
)
watch: bool = dataclasses.field(
default=False,
metadata=SettingSpec(
action="store_true",
flags=["--watch", "-w"],
group=SettingType.PARAMETER,
help="-w, --watch: watch target(s) for changes",
).as_dict(),
)
hits: int = dataclasses.field(
default=5,
metadata=SettingSpec(
Expand Down
34 changes: 28 additions & 6 deletions mnamer/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,34 @@ def __str__(self) -> str:
def populate_paths(cls: Type[Target], settings: SettingStore) -> list[Target]:
"""Creates a list of Target objects for media files found in paths."""
file_paths = crawl_in(settings.targets, settings.recurse)
file_paths = filter_blacklist(file_paths, settings.ignore)
file_paths = filter_containers(file_paths, settings.mask)
targets = [cls(file_path, settings) for file_path in file_paths]
targets = list(dict.fromkeys(targets)) # unique values
targets = list(filter(cls._matches_media, targets))
return targets
return cls.find_targets(settings, file_paths)

@classmethod
def find_targets(
cls: Type[Target], settings: SettingStore, file_paths: list[Path]
) -> list[Target]:
targets = set[Target]()
for file_path in file_paths:
target = cls.filter_file_path(settings, file_path)
if target is not None:
targets.add(target)
return sorted(targets, key=lambda target: target.source)

@classmethod
def filter_file_path(
cls: Type[Target], settings: SettingStore, file_path: Path
) -> Target | None:
file_path = file_path.absolute()
if not filter_blacklist(file_path, settings.ignore):
return None
elif not filter_containers(file_path, settings.mask):
return None

target = cls(file_path, settings)
if not cls._matches_media(target):
return None

return target

@classmethod
def reset_providers(cls):
Expand Down
4 changes: 2 additions & 2 deletions mnamer/tty.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ def msg(
if debug and not verbose:
return
if no_style:
print(_msg_format(body))
print(_msg_format(body), flush=True)
else:
style_print(_msg_format(body), style=message_type.value)
style_print(_msg_format(body), style=message_type.value, flush=True)


def error(body: Any):
Expand Down
26 changes: 8 additions & 18 deletions mnamer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,19 @@ def filename_replace(filename: str, replacements: dict[str, str]) -> str:
return base + container


def filter_blacklist(paths: list[Path], blacklist: list[str]) -> list[Path]:
def filter_blacklist(file_path: Path, blacklist: list[str]) -> bool:
"""Filters (set difference) paths by a collection of regex pattens."""
return [
path.absolute()
for path in paths
if not any(
re.search(pattern, str(path), re.IGNORECASE)
for pattern in blacklist
if pattern
)
]
return not any(
re.search(pattern, str(file_path), re.IGNORECASE)
for pattern in blacklist
if pattern
)


def filter_containers(
file_paths: list[Path], valid_containers: list[str]
) -> list[Path]:
def filter_containers(file_path: Path, valid_containers: list[str]) -> bool:
"""Filters (set intersection) a collection of containers."""
valid_containers = normalize_containers(valid_containers)
return [
file_path
for file_path in file_paths
if not valid_containers or file_path.suffix.lower() in valid_containers
]
return not valid_containers or file_path.suffix.lower() in valid_containers


def findall(s, ss) -> Iterator[int]:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ requests_cache ~= 0.9.7
setuptools_scm ~= 7.1.0
teletype ~= 1.3.4
typing-extensions ~= 4.7.1
watchdog ~= 3.0.0
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"test": False,
"verbose": False,
"version": False,
"watch": False,
}


Expand Down
Loading