diff --git a/ignis/services/hyprland/service.py b/ignis/services/hyprland/service.py index e3a8caa1..d79267ad 100644 --- a/ignis/services/hyprland/service.py +++ b/ignis/services/hyprland/service.py @@ -88,6 +88,7 @@ def __init__(self): self._active_workspace: dict[str, Any] = {} self._kb_layout: str = "" self._active_window: dict[str, Any] = {} + self._urgent_windows: set[str] = set() if self.is_available: self.__listen_events() @@ -114,6 +115,29 @@ def workspaces(self) -> list[dict[str, Any]]: """ return self._workspaces + @GObject.Property + def urgent_windows(self) -> list[str]: + """ + - read-only + + A list of urgent windows. + """ + return list(self._urgent_windows) + + @GObject.Property + def urgent_workspaces(self) -> list[str]: + """ + - read-only + + A list of urgent workspaces. + """ + clients = json.loads(self.send_command("j/clients")) + urgent_workspaces = [] + for i in clients: + if i["address"][len("0x"):] in self._urgent_windows: + urgent_workspaces.append(i["workspace"]["id"]) + return urgent_workspaces + @GObject.Property def active_workspace(self) -> dict[str, Any]: """ @@ -161,6 +185,9 @@ def __on_event_received(self, event: str) -> None: elif event.startswith("activewindow>>"): self.__sync_active_window() + elif event.startswith("urgent>>"): + self.__sync_urgent(event[len("urgent>>"):]) + def __sync_workspaces(self) -> None: self._workspaces = sorted( json.loads(self.send_command("j/workspaces")), key=lambda x: x["id"] @@ -177,7 +204,18 @@ def __sync_kb_layout(self) -> None: def __sync_active_window(self) -> None: self._active_window = json.loads(self.send_command("j/activewindow")) + if self._active_window: + active_window_id = self._active_window["address"][len("0x"):] + if active_window_id in self._urgent_windows: + self._urgent_windows.remove(active_window_id) self.notify("active_window") + self.notify("urgent_windows") + self.notify("urgent_workspaces") + + def __sync_urgent(self, urgent_window) -> None: + self._urgent_windows.add(urgent_window) + self.notify("urgent_windows") + self.notify("urgent_workspaces") def send_command(self, cmd: str) -> str: """