Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ros2cli/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<exec_depend>python3-packaging</exec_depend>
<exec_depend>python3-psutil</exec_depend>
<exec_depend>rclpy</exec_depend>
<exec_depend>fzf</exec_depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
Expand Down
71 changes: 71 additions & 0 deletions ros2cli/ros2cli/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
import functools
import inspect
import os
import shutil
import subprocess
import sys
import time

from typing import Dict
from typing import List
from typing import Optional


def get_ros_domain_id():
Expand Down Expand Up @@ -133,3 +137,70 @@ def get_rmw_additional_env(rmw_implementation: str) -> Dict[str, str]:
return {
'RMW_IMPLEMENTATION': rmw_implementation,
}


def interactive_select(
items: List[str],
prompt: str = 'Select an item:'
) -> Optional[str]:
"""
Launch interactive fuzzy search using fzf to select from a list of items.

:param items: List of items to select from
:param prompt: Prompt message to display in fzf
:return: Selected item or None if user cancelled or fzf not available
"""
if not items:
print('No items available to select from.', file=sys.stderr)
return None

# Check if we're in an interactive terminal
if not sys.stdin.isatty() or not sys.stdout.isatty():
print('Error: Interactive selection requires a TTY terminal.', file=sys.stderr)
return None

# Check if fzf is available
if shutil.which('fzf') is None:
print(
'Error: fzf is not installed...',
file=sys.stderr
)
return None

try:
# Launch fzf with items as input
process = subprocess.Popen(
['fzf', '--prompt', prompt + ' ', '--height', '40%', '--reverse'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True
)

try:
# Send items to fzf
stdout, _ = process.communicate(input='\n'.join(items))
except KeyboardInterrupt:
# Handle Ctrl+C gracefully to avoid leaving terminal in bad state
process.terminate()
process.wait()
# Reset terminal to normal mode after fzf interruption
subprocess.run(['stty', 'sane'], check=False)
return None
finally:
# Ensure terminal is restored even if an exception occurs
if process.poll() is None:
process.terminate()
process.wait()
subprocess.run(['stty', 'sane'], check=False)

# Check if user cancelled (Ctrl-C or ESC)
if process.returncode != 0:
return None

# Return selected item (strip newline)
selected = stdout.strip()
return selected if selected else None

except (OSError, subprocess.SubprocessError) as e:
print(f'Error during interactive selection: {e}', file=sys.stderr)
return None
17 changes: 17 additions & 0 deletions ros2interface/ros2interface/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,20 @@ def interface_to_yaml(identifier):
instance = interface()

return message_to_yaml(instance)


def get_all_interface_names():
"""Get all available interface names (messages, services, and actions)."""
types = []
for package_name, service_names in get_service_interfaces().items():
for service_name in service_names:
types.append(f'{package_name}/{service_name}')

for package_name, message_names in get_message_interfaces().items():
for message_name in message_names:
types.append(f'{package_name}/{message_name}')

for package_name, action_names in get_action_interfaces().items():
for action_name in action_names:
types.append(f'{package_name}/{action_name}')
return sorted(types)
24 changes: 22 additions & 2 deletions ros2interface/ros2interface/verb/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import sys
import typing

from ros2cli.helpers import interactive_select
from ros2interface.api import get_all_interface_names
from ros2interface.api import type_completer
from ros2interface.verb import VerbExtension
from rosidl_adapter.parser import \
Expand Down Expand Up @@ -167,8 +169,8 @@ def __call__(self, parser, namespace, values, option_string=None):
if sys.stdin.isatty():
parser.error('expected stdin pipe')
values = sys.stdin.readline().strip()
if not values:
parser.error('the passed value is empty')
if not values:
parser.error('the passed value is empty')
setattr(namespace, self.dest, values)


Expand All @@ -190,13 +192,31 @@ def add_arguments(self, parser, cli_name):

arg = parser.add_argument(
'type',
nargs='?',
action=ReadStdinPipe,
help="Show an interface definition (e.g. 'example_interfaces/msg/String'). "
"If not provided, an interactive selection will be shown. "
"Passing '-' reads the argument from stdin (e.g. "
"'ros2 topic type /chatter | ros2 interface show -').")
arg.completer = type_completer

def main(self, *, args):
# If no type provided, launch interactive selection
if args.type is None:
interface_types = get_all_interface_names()

if not interface_types:
return 'No interfaces available to select from.'

selected_type = interactive_select(
interface_types,
prompt='Select interface to show:')

if selected_type is None:
return 'No interface selected'

args.type = selected_type

try:
_show_interface(
args.type,
Expand Down
24 changes: 22 additions & 2 deletions ros2node/ros2node/verb/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import sys

from ros2cli.helpers import interactive_select
from ros2cli.node.strategy import add_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2node.api import get_action_client_info
Expand All @@ -38,15 +39,34 @@ class InfoVerb(VerbExtension):
def add_arguments(self, parser, cli_name):
add_arguments(parser)
argument = parser.add_argument(
'node_name',
help='Fully qualified node name to request information')
'node_name', nargs='?',
help='Fully qualified node name to request information. '
'If not provided, an interactive selection will be shown.')
argument.completer = NodeNameCompleter()
parser.add_argument(
'--include-hidden', action='store_true',
help='Display hidden topics, services, and actions as well')

def main(self, *, args):
with NodeStrategy(args) as node:
# If no node name provided, launch interactive selection
if args.node_name is None:
node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden)
node_name_list = [n.full_name for n in node_names]

# Check if there are any nodes before attempting selection
if not node_name_list:
return 'No nodes available to select from.'

selected_node = interactive_select(
node_name_list,
prompt='Select node for info:')

if selected_node is None:
return None

args.node_name = selected_node

node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden)
count = [n.full_name for n in node_names].count(args.node_name)
if count > 1:
Expand Down
54 changes: 52 additions & 2 deletions ros2param/ros2param/verb/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
# limitations under the License.

from rcl_interfaces.msg import ParameterType
from ros2cli.helpers import interactive_select
from ros2cli.node.direct import DirectNode
from ros2cli.node.strategy import add_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2node.api import get_absolute_node_name
from ros2node.api import get_node_names
from ros2node.api import NodeNameCompleter
from ros2node.api import wait_for_node
from ros2param.api import call_get_parameters
from ros2param.api import call_list_parameters
from ros2param.api import ParameterNameCompleter
from ros2param.verb import VerbExtension

Expand All @@ -30,14 +33,16 @@ class GetVerb(VerbExtension):
def add_arguments(self, parser, cli_name): # noqa: D102
add_arguments(parser)
arg = parser.add_argument(
'node_name', help='Name of the ROS node')
'node_name', nargs='?',
help='Name of the ROS node. If not provided, an interactive selection will be shown.')
arg.completer = NodeNameCompleter(
include_hidden_nodes_key='include_hidden_nodes')
parser.add_argument(
'--include-hidden-nodes', action='store_true',
help='Consider hidden nodes as well')
arg = parser.add_argument(
'parameter_name', help='Name of the parameter')
'parameter_name', nargs='?',
help='Name of the parameter. If not provided, an interactive selection will be shown.')
arg.completer = ParameterNameCompleter()
parser.add_argument(
'--hide-type', action='store_true',
Expand All @@ -47,6 +52,51 @@ def add_arguments(self, parser, cli_name): # noqa: D102
help='Wait for N seconds until node becomes available (default %(default)s sec)')

def main(self, *, args): # noqa: D102
# If no node name provided, launch interactive selection
if args.node_name is None:
with NodeStrategy(args) as node:
node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden_nodes)
node_name_list = [n.full_name for n in node_names]

if not node_name_list:
return 'No nodes available to select from.'

selected_node = interactive_select(
node_name_list,
prompt='Select node:')

if selected_node is None:
return 'No node selected'

args.node_name = selected_node

# If no parameter name provided, launch interactive selection
if args.parameter_name is None:
with DirectNode(args) as node:
response = call_list_parameters(
node=node, node_name=args.node_name)

if response is None:
return 'Unable to get parameters: service call timed out.'

if response.result() is None:
return 'Unable to get parameters: service call failed.'

parameter_names = response.result().result.names

if not parameter_names:
return 'No parameters available to select from.'

selected_param = interactive_select(
parameter_names,
prompt='Select parameter:')

if selected_param is None:
# Only show this if interactive_select didn't already print an error
return None

args.parameter_name = selected_param

node_name = get_absolute_node_name(args.node_name)
with NodeStrategy(args) as node:
if not wait_for_node(node, node_name, args.include_hidden_nodes, args.timeout):
Expand Down
27 changes: 25 additions & 2 deletions ros2service/ros2service/verb/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
from rclpy.qos import QoSProfile

from ros2cli.helpers import collect_stdin
from ros2cli.helpers import interactive_select
from ros2cli.node import NODE_NAME_PREFIX
from ros2cli.node.strategy import NodeStrategy
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import profile_configure_short_keys

from ros2service.api import get_service_names
from ros2service.api import ServiceNameCompleter
from ros2service.api import ServicePrototypeCompleter
from ros2service.api import ServiceTypeCompleter
Expand All @@ -40,8 +43,9 @@ class CallVerb(VerbExtension):

def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
'service_name',
help="Name of the ROS service to call to (e.g. '/add_two_ints')")
'service_name', nargs='?',
help="Name of the ROS service to call to (e.g. '/add_two_ints'). "
"If not provided, an interactive selection will be shown.")
arg.completer = ServiceNameCompleter(
include_hidden_services_key='include_hidden_services')
arg = parser.add_argument(
Expand All @@ -66,6 +70,25 @@ def add_arguments(self, parser, cli_name):
add_qos_arguments(parser, 'service client', 'services_default')

def main(self, *, args):
# If no service name provided, launch interactive selection
if args.service_name is None:
with NodeStrategy(args) as node:
service_names = get_service_names(
node=node,
include_hidden_services=args.include_hidden_services)

if not service_names:
return 'No services available to select from.'

selected_service = interactive_select(
service_names,
prompt='Select service to call:')

if selected_service is None:
return None

args.service_name = selected_service

if args.rate is not None and args.rate <= 0:
raise RuntimeError('rate must be greater than zero')
period = 1. / args.rate if args.rate else None
Expand Down
21 changes: 20 additions & 1 deletion ros2topic/ros2topic/verb/bw.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@

from rclpy.executors import ExternalShutdownException

from ros2cli.helpers import interactive_select
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import choose_qos

from ros2topic.api import get_msg_class
from ros2topic.api import get_topic_names
from ros2topic.api import get_topic_names_and_types
from ros2topic.api import positive_int
from ros2topic.api import TopicNameCompleter
Expand Down Expand Up @@ -101,8 +103,25 @@ def main(self, *, args):


def main(args):
# Interactive selection if no topic name and not --all
if not args.all_topics and not args.topic_name:
raise RuntimeError('Either specify topic names or use --all/-a option')
with DirectNode(args) as node:
topic_names = get_topic_names(
node=node.node,
include_hidden_topics=args.include_hidden_topics)

if not topic_names:
return 'No topics available to select from.'

selected_topic = interactive_select(
topic_names,
prompt='Select topic for bw:')

if selected_topic is None:
return None

args.topic_name = [selected_topic]

if args.all_topics and args.topic_name:
raise RuntimeError('Cannot specify both --all/-a and topic names')

Expand Down
Loading