Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ros2cli/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<exec_depend>python3-packaging</exec_depend>
<exec_depend>python3-psutil</exec_depend>
<exec_depend>rclpy</exec_depend>
<exec_depend>fzf</exec_depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
Expand Down
61 changes: 61 additions & 0 deletions ros2cli/ros2cli/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import functools
import inspect
import os
import subprocess
import sys
import time

from typing import Dict
from typing import List
from typing import Optional


def get_ros_domain_id():
Expand Down Expand Up @@ -133,3 +136,61 @@ def get_rmw_additional_env(rmw_implementation: str) -> Dict[str, str]:
return {
'RMW_IMPLEMENTATION': rmw_implementation,
}


def interactive_select(
items: List[str],
prompt: str = 'Select an item:'
) -> Optional[str]:
"""
Launch interactive fuzzy search using fzf to select from a list of items.

:param items: List of items to select from
:param prompt: Prompt message to display in fzf
:return: Selected item or None if user cancelled or fzf not available
"""
if not items:
print('No items available to select from.', file=sys.stderr)
return None

try:
# Check if fzf is available
result = subprocess.run(
['fzf', '--version'],
capture_output=True,
text=True,
timeout=1
)
if result.returncode != 0:
raise FileNotFoundError()
except (FileNotFoundError, subprocess.TimeoutExpired):
print(
'Error: fzf is not installed but is a dependency for this package. You can install it with rosdep',
file=sys.stderr
)
return None

try:
# Launch fzf with items as input - using direct TTY access
process = subprocess.Popen(
['fzf', '--prompt', prompt + ' ', '--height', '40%', '--reverse'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True
)

# Send items to fzf
stdout, _ = process.communicate(input='\n'.join(items))

# Check if user cancelled (Ctrl-C or ESC)
if process.returncode != 0:
return None

# Return selected item (strip newline)
selected = stdout.strip()
return selected if selected else None

except Exception as e:
print(f'Error during interactive selection: {e}', file=sys.stderr)
return None

17 changes: 17 additions & 0 deletions ros2interface/ros2interface/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,20 @@ def interface_to_yaml(identifier):
instance = interface()

return message_to_yaml(instance)


def get_all_interface_names():
"""Get all available interface names (messages, services, and actions)."""
types = []
for package_name, service_names in get_service_interfaces().items():
for service_name in service_names:
types.append(f'{package_name}/srv/{service_name}')

for package_name, message_names in get_message_interfaces().items():
for message_name in message_names:
types.append(f'{package_name}/msg/{message_name}')

for package_name, action_names in get_action_interfaces().items():
for action_name in action_names:
types.append(f'{package_name}/action/{action_name}')
return sorted(types)
17 changes: 17 additions & 0 deletions ros2interface/ros2interface/verb/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import sys
import typing

from ros2cli.helpers import interactive_select
from ros2interface.api import get_all_interface_names
from ros2interface.api import type_completer
from ros2interface.verb import VerbExtension
from rosidl_adapter.parser import \
Expand Down Expand Up @@ -190,13 +192,28 @@ def add_arguments(self, parser, cli_name):

arg = parser.add_argument(
'type',
nargs='?',
action=ReadStdinPipe,
help="Show an interface definition (e.g. 'example_interfaces/msg/String'). "
"If not provided, an interactive selection will be shown. "
"Passing '-' reads the argument from stdin (e.g. "
"'ros2 topic type /chatter | ros2 interface show -').")
arg.completer = type_completer

def main(self, *, args):
# If no type provided, launch interactive selection
if args.type is None:
interface_types = get_all_interface_names()

selected_type = interactive_select(
interface_types,
prompt='Select interface to show:')

if selected_type is None:
return 'No interface selected'

args.type = selected_type

try:
_show_interface(
args.type,
Expand Down
20 changes: 18 additions & 2 deletions ros2node/ros2node/verb/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import sys

from ros2cli.helpers import interactive_select
from ros2cli.node.strategy import add_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2node.api import get_action_client_info
Expand All @@ -38,15 +39,30 @@ class InfoVerb(VerbExtension):
def add_arguments(self, parser, cli_name):
add_arguments(parser)
argument = parser.add_argument(
'node_name',
help='Fully qualified node name to request information')
'node_name', nargs='?',
help='Fully qualified node name to request information. '
'If not provided, an interactive selection will be shown.')
argument.completer = NodeNameCompleter()
parser.add_argument(
'--include-hidden', action='store_true',
help='Display hidden topics, services, and actions as well')

def main(self, *, args):
with NodeStrategy(args) as node:
# If no node name provided, launch interactive selection
if args.node_name is None:
node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden)
node_name_list = [n.full_name for n in node_names]

selected_node = interactive_select(
node_name_list,
prompt='Select node for info:')

if selected_node is None:
return 'No node selected'

args.node_name = selected_node

node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden)
count = [n.full_name for n in node_names].count(args.node_name)
if count > 1:
Expand Down
39 changes: 37 additions & 2 deletions ros2param/ros2param/verb/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
# limitations under the License.

from rcl_interfaces.msg import ParameterType
from ros2cli.helpers import interactive_select
from ros2cli.node.direct import DirectNode
from ros2cli.node.strategy import add_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2node.api import get_absolute_node_name
from ros2node.api import get_node_names
from ros2node.api import NodeNameCompleter
from ros2node.api import wait_for_node
from ros2param.api import call_get_parameters
from ros2param.api import call_list_parameters
from ros2param.api import ParameterNameCompleter
from ros2param.verb import VerbExtension

Expand All @@ -30,14 +33,16 @@ class GetVerb(VerbExtension):
def add_arguments(self, parser, cli_name): # noqa: D102
add_arguments(parser)
arg = parser.add_argument(
'node_name', help='Name of the ROS node')
'node_name', nargs='?',
help='Name of the ROS node. If not provided, an interactive selection will be shown.')
arg.completer = NodeNameCompleter(
include_hidden_nodes_key='include_hidden_nodes')
parser.add_argument(
'--include-hidden-nodes', action='store_true',
help='Consider hidden nodes as well')
arg = parser.add_argument(
'parameter_name', help='Name of the parameter')
'parameter_name', nargs='?',
help='Name of the parameter. If not provided, an interactive selection will be shown.')
arg.completer = ParameterNameCompleter()
parser.add_argument(
'--hide-type', action='store_true',
Expand All @@ -47,6 +52,36 @@ def add_arguments(self, parser, cli_name): # noqa: D102
help='Wait for N seconds until node becomes available (default %(default)s sec)')

def main(self, *, args): # noqa: D102
# If no node name provided, launch interactive selection
if args.node_name is None:
with NodeStrategy(args) as node:
node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden_nodes)
node_name_list = [n.full_name for n in node_names]

selected_node = interactive_select(
node_name_list,
prompt='Select node:')

if selected_node is None:
return 'No node selected'

args.node_name = selected_node

# If no parameter name provided, launch interactive selection
if args.parameter_name is None:
with DirectNode(args) as node:
parameter_names = call_list_parameters(
node=node, node_name=args.node_name)

selected_param = interactive_select(
parameter_names,
prompt='Select parameter:')

if selected_param is None:
return 'No parameter selected'

args.parameter_name = selected_param

node_name = get_absolute_node_name(args.node_name)
with NodeStrategy(args) as node:
if not wait_for_node(node, node_name, args.include_hidden_nodes, args.timeout):
Expand Down
24 changes: 22 additions & 2 deletions ros2service/ros2service/verb/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
from rclpy.qos import QoSProfile

from ros2cli.helpers import collect_stdin
from ros2cli.helpers import interactive_select
from ros2cli.node import NODE_NAME_PREFIX
from ros2cli.node.strategy import NodeStrategy
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import profile_configure_short_keys

from ros2service.api import get_service_names
from ros2service.api import ServiceNameCompleter
from ros2service.api import ServicePrototypeCompleter
from ros2service.api import ServiceTypeCompleter
Expand All @@ -40,8 +43,9 @@ class CallVerb(VerbExtension):

def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
'service_name',
help="Name of the ROS service to call to (e.g. '/add_two_ints')")
'service_name', nargs='?',
help="Name of the ROS service to call to (e.g. '/add_two_ints'). "
"If not provided, an interactive selection will be shown.")
arg.completer = ServiceNameCompleter(
include_hidden_services_key='include_hidden_services')
arg = parser.add_argument(
Expand All @@ -66,6 +70,22 @@ def add_arguments(self, parser, cli_name):
add_qos_arguments(parser, 'service client', 'services_default')

def main(self, *, args):
# If no service name provided, launch interactive selection
if args.service_name is None:
with NodeStrategy(args) as node:
service_names = get_service_names(
node=node,
include_hidden_services=args.include_hidden_services)

selected_service = interactive_select(
service_names,
prompt='Select service to call:')

if selected_service is None:
return 'No service selected'

args.service_name = selected_service

if args.rate is not None and args.rate <= 0:
raise RuntimeError('rate must be greater than zero')
period = 1. / args.rate if args.rate else None
Expand Down
18 changes: 17 additions & 1 deletion ros2topic/ros2topic/verb/bw.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@

from rclpy.executors import ExternalShutdownException

from ros2cli.helpers import interactive_select
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import choose_qos

from ros2topic.api import get_msg_class
from ros2topic.api import get_topic_names
from ros2topic.api import get_topic_names_and_types
from ros2topic.api import positive_int
from ros2topic.api import TopicNameCompleter
Expand Down Expand Up @@ -101,8 +103,22 @@ def main(self, *, args):


def main(args):
# Interactive selection if no topic name and not --all
if not args.all_topics and not args.topic_name:
raise RuntimeError('Either specify topic names or use --all/-a option')
with DirectNode(args) as node:
topic_names = get_topic_names(
node=node.node,
include_hidden_topics=args.include_hidden_topics)

selected_topic = interactive_select(
topic_names,
prompt='Select topic for bw:')

if selected_topic is None:
return 'No topic selected'

args.topic_name = [selected_topic]

if args.all_topics and args.topic_name:
raise RuntimeError('Cannot specify both --all/-a and topic names')

Expand Down
23 changes: 21 additions & 2 deletions ros2topic/ros2topic/verb/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
from rclpy.qos import QoSProfile
from rclpy.task import Future

from ros2cli.helpers import interactive_select
from ros2cli.helpers import unsigned_int
from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import choose_qos

from ros2topic.api import get_msg_class
from ros2topic.api import get_topic_names
from ros2topic.api import positive_float
from ros2topic.api import TopicNameCompleter
from ros2topic.verb import VerbExtension
Expand All @@ -51,8 +53,9 @@ def add_arguments(self, parser, cli_name):
add_strategy_node_arguments(parser)

arg = parser.add_argument(
'topic_name',
help="Name of the ROS topic to listen to (e.g. '/chatter')")
'topic_name', nargs='?',
help="Name of the ROS topic to listen to (e.g. '/chatter'). "
"If not provided, an interactive selection will be shown.")
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')
parser.add_argument(
Expand Down Expand Up @@ -121,6 +124,22 @@ def add_arguments(self, parser, cli_name):

def main(self, *, args):

# If no topic name provided, launch interactive selection
if args.topic_name is None:
with NodeStrategy(args) as node:
topic_names = get_topic_names(
node=node,
include_hidden_topics=args.include_hidden_topics)

selected_topic = interactive_select(
topic_names,
prompt='Select topic to echo:')

if selected_topic is None:
return 'No topic selected'

args.topic_name = selected_topic

self.csv = args.csv

# Validate field selection
Expand Down
Loading