Skip to content

asyncio: Asynchronous libtmux #554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ dev-dependencies = [
"typing-extensions; python_version < '3.11'",
"gp-libs",
"pytest",
"pytest-asyncio",
"pytest-rerunfailures",
"pytest-mock",
"pytest-watcher",
Expand Down Expand Up @@ -98,6 +99,7 @@ testing = [
"typing-extensions; python_version < '3.11'",
"gp-libs",
"pytest",
"pytest-asyncio",
"pytest-rerunfailures",
"pytest-mock",
"pytest-watcher",
Expand Down
144 changes: 144 additions & 0 deletions src/libtmux/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from __future__ import annotations

import asyncio
import logging
import re
import shutil
Expand Down Expand Up @@ -267,6 +268,149 @@ def __init__(self, *args: t.Any) -> None:
)


class AsyncTmuxCmd:
"""
An asyncio-compatible class for running any tmux command via subprocess.

Attributes
----------
cmd : list[str]
The full command (including the "tmux" binary path).
stdout : list[str]
Lines of stdout output from tmux.
stderr : list[str]
Lines of stderr output from tmux.
returncode : int
The process return code.

Examples
--------
>>> import asyncio
>>>
>>> async def main():
... proc = await AsyncTmuxCmd.run('-V')
... if proc.stderr:
... raise exc.LibTmuxException(
... f"Error invoking tmux: {proc.stderr}"
... )
... print("tmux version:", proc.stdout)
...
>>> asyncio.run(main())
tmux version: [...]

This is equivalent to calling:

.. code-block:: console

$ tmux -V
"""

def __init__(
self,
cmd: list[str],
stdout: list[str],
stderr: list[str],
returncode: int,
) -> None:
"""
Store the results of a completed tmux subprocess run.

Parameters
----------
cmd : list[str]
The command used to invoke tmux.
stdout : list[str]
Captured lines from tmux stdout.
stderr : list[str]
Captured lines from tmux stderr.
returncode : int
Subprocess exit code.
"""
self.cmd: list[str] = cmd
self.stdout: list[str] = stdout
self.stderr: list[str] = stderr
self.returncode: int = returncode

@classmethod
async def run(cls, *args: t.Any) -> AsyncTmuxCmd:
"""
Execute a tmux command asynchronously and capture its output.

Parameters
----------
*args : str
Arguments to be passed after the "tmux" binary name.

Returns
-------
AsyncTmuxCmd
An instance containing the cmd, stdout, stderr, and returncode.

Raises
------
exc.TmuxCommandNotFound
If no "tmux" executable is found in the user's PATH.
exc.LibTmuxException
If there's any unexpected exception creating or communicating
with the tmux subprocess.
"""
tmux_bin: str | None = shutil.which("tmux")
if not tmux_bin:
msg = "tmux executable not found in PATH"
raise exc.TmuxCommandNotFound(
msg,
)

cmd: list[str] = [tmux_bin] + [str(c) for c in args]

try:
process: asyncio.subprocess.Process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_bytes, stderr_bytes = await process.communicate()
returncode: int = (
process.returncode if process.returncode is not None else -1
)

except Exception as e:
logger.exception("Exception for %s", " ".join(cmd))
msg = f"Exception while running tmux command: {e}"
raise exc.LibTmuxException(
msg,
) from e

# Decode bytes to string with error handling
stdout = stdout_bytes.decode(errors="backslashreplace")
stderr = stderr_bytes.decode(errors="backslashreplace")

# Split on newlines and filter empty lines
stdout_split: list[str] = stdout.split("\n")
# remove trailing newlines from stdout
while stdout_split and stdout_split[-1] == "":
stdout_split.pop()

stderr_split = stderr.split("\n")
stderr_split = list(filter(None, stderr_split)) # filter empty values

# Workaround for tmux "has-session" command behavior
if "has-session" in cmd and stderr_split and not stdout_split:
# If `has-session` fails, it might output an error on stderr
# with nothing on stdout. We replicate the original logic here:
stdout_split = [stderr_split[0]]

logger.debug("stdout for %s: %s", " ".join(cmd), stdout_split)
logger.debug("stderr for %s: %s", " ".join(cmd), stderr_split)

return cls(
cmd=cmd,
stdout=stdout_split,
stderr=stderr_split,
returncode=returncode,
)


def get_version() -> LooseVersion:
"""Return tmux version.

Expand Down
49 changes: 48 additions & 1 deletion src/libtmux/pane.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import typing as t
import warnings

from libtmux.common import has_gte_version, has_lt_version, tmux_cmd
from libtmux.common import AsyncTmuxCmd, has_gte_version, has_lt_version, tmux_cmd
from libtmux.constants import (
PANE_DIRECTION_FLAG_MAP,
RESIZE_ADJUSTMENT_DIRECTION_FLAG_MAP,
Expand Down Expand Up @@ -201,6 +201,53 @@ def cmd(

return self.server.cmd(cmd, *args, target=target)

async def acmd(
self,
cmd: str,
*args: t.Any,
target: str | int | None = None,
) -> AsyncTmuxCmd:
"""Execute tmux subcommand within pane context.

Automatically binds target by adding ``-t`` for object's pane ID to the
command. Pass ``target`` to keyword arguments to override.

Examples
--------
>>> import asyncio
>>> async def test_acmd():
... result = await pane.acmd('split-window', '-P')
... print(result.stdout[0])
>>> asyncio.run(test_acmd())
libtmux...:...

From raw output to an enriched `Pane` object:

>>> async def test_from_pane():
... pane_id_result = await pane.acmd(
... 'split-window', '-P', '-F#{pane_id}'
... )
... return Pane.from_pane_id(
... pane_id=pane_id_result.stdout[0],
... server=session.server
... )
>>> asyncio.run(test_from_pane())
Pane(%... Window(@... ...:..., Session($1 libtmux_...)))

Parameters
----------
target : str, optional
Optional custom target override. By default, the target is the pane ID.

Returns
-------
:meth:`server.cmd`
"""
if target is None:
target = self.pane_id

return await self.server.acmd(cmd, *args, target=target)

"""
Commands (tmux-like)
"""
Expand Down
97 changes: 94 additions & 3 deletions src/libtmux/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from . import exc, formats
from .common import (
AsyncTmuxCmd,
EnvironmentMixin,
PaneDict,
SessionDict,
Expand Down Expand Up @@ -251,8 +252,12 @@ def cmd(

Output of `tmux -L ... new-window -P -F#{window_id}` to a `Window` object:

>>> Window.from_window_id(window_id=session.cmd(
... 'new-window', '-P', '-F#{window_id}').stdout[0], server=session.server)
>>> Window.from_window_id(
... window_id=session.cmd(
... 'new-window', '-P', '-F#{window_id}'
... ).stdout[0],
... server=session.server,
... )
Window(@4 3:..., Session($1 libtmux_...))

Create a pane from a window:
Expand All @@ -263,7 +268,9 @@ def cmd(
Output of `tmux -L ... split-window -P -F#{pane_id}` to a `Pane` object:

>>> Pane.from_pane_id(pane_id=window.cmd(
... 'split-window', '-P', '-F#{pane_id}').stdout[0], server=window.server)
... 'split-window', '-P', '-F#{pane_id}').stdout[0],
... server=window.server
... )
Pane(%... Window(@... ...:..., Session($1 libtmux_...)))

Parameters
Expand Down Expand Up @@ -301,6 +308,90 @@ def cmd(

return tmux_cmd(*svr_args, *cmd_args)

async def acmd(
self,
cmd: str,
*args: t.Any,
target: str | int | None = None,
) -> AsyncTmuxCmd:
"""Execute tmux command respective of socket name and file, return output.

Examples
--------
>>> import asyncio
>>> async def test_acmd():
... result = await server.acmd('display-message', 'hi')
... print(result.stdout)
>>> asyncio.run(test_acmd())
[]

New session:

>>> async def test_new_session():
... result = await server.acmd(
... 'new-session', '-d', '-P', '-F#{session_id}'
... )
... print(result.stdout[0])
>>> asyncio.run(test_new_session())
$...

Output of `tmux -L ... new-window -P -F#{window_id}` to a `Window` object:

>>> async def test_new_window():
... result = await session.acmd('new-window', '-P', '-F#{window_id}')
... window_id = result.stdout[0]
... window = Window.from_window_id(window_id=window_id, server=server)
... print(window)
>>> asyncio.run(test_new_window())
Window(@... ...:..., Session($... libtmux_...))

Create a pane from a window:

>>> async def test_split_window():
... result = await server.acmd('split-window', '-P', '-F#{pane_id}')
... print(result.stdout[0])
>>> asyncio.run(test_split_window())
%...

Output of `tmux -L ... split-window -P -F#{pane_id}` to a `Pane` object:

>>> async def test_pane():
... result = await window.acmd('split-window', '-P', '-F#{pane_id}')
... pane_id = result.stdout[0]
... pane = Pane.from_pane_id(pane_id=pane_id, server=server)
... print(pane)
>>> asyncio.run(test_pane())
Pane(%... Window(@... ...:..., Session($1 libtmux_...)))

Parameters
----------
target : str, optional
Optional custom target.

Returns
-------
:class:`common.AsyncTmuxCmd`
"""
svr_args: list[str | int] = [cmd]
cmd_args: list[str | int] = []
if self.socket_name:
svr_args.insert(0, f"-L{self.socket_name}")
if self.socket_path:
svr_args.insert(0, f"-S{self.socket_path}")
if self.config_file:
svr_args.insert(0, f"-f{self.config_file}")
if self.colors:
if self.colors == 256:
svr_args.insert(0, "-2")
elif self.colors == 88:
svr_args.insert(0, "-8")
else:
raise exc.UnknownColorOption

cmd_args = ["-t", str(target), *args] if target is not None else [*args]

return await AsyncTmuxCmd.run(*svr_args, *cmd_args)

@property
def attached_sessions(self) -> list[Session]:
"""Return active :class:`Session`s.
Expand Down
Loading
Loading