diff --git a/rosidl_generator_py/cmake/rosidl_generator_py_generate_interfaces.cmake b/rosidl_generator_py/cmake/rosidl_generator_py_generate_interfaces.cmake
index de6a6c65..177b15c9 100644
--- a/rosidl_generator_py/cmake/rosidl_generator_py_generate_interfaces.cmake
+++ b/rosidl_generator_py/cmake/rosidl_generator_py_generate_interfaces.cmake
@@ -301,5 +301,10 @@ if(BUILD_TESTING AND rosidl_generate_interfaces_ADD_LINTER_TESTS)
# a value of zero tells uncrustify to ignore line length
MAX_LINE_LENGTH 0
"${_output_path}")
+
+ find_package(ament_cmake_mypy REQUIRED)
+ ament_mypy(
+ TESTNAME "mypy_rosidl_generated_py"
+ "${_output_path}")
endif()
endif()
diff --git a/rosidl_generator_py/package.xml b/rosidl_generator_py/package.xml
index f3ff744d..047313a8 100644
--- a/rosidl_generator_py/package.xml
+++ b/rosidl_generator_py/package.xml
@@ -35,6 +35,7 @@
ament_cmake_cppcheck
ament_cmake_cpplint
ament_cmake_flake8
+ ament_cmake_mypy
ament_cmake_pep257
ament_cmake_uncrustify
diff --git a/rosidl_generator_py/py.typed b/rosidl_generator_py/py.typed
new file mode 100644
index 00000000..e69de29b
diff --git a/rosidl_generator_py/resource/_action.py.em b/rosidl_generator_py/resource/_action.py.em
index 77fbf875..1acdc646 100644
--- a/rosidl_generator_py/resource/_action.py.em
+++ b/rosidl_generator_py/resource/_action.py.em
@@ -5,40 +5,50 @@ from rosidl_pycommon import convert_camel_case_to_lower_case_underscore
action_name = '_' + convert_camel_case_to_lower_case_underscore(action.namespaced_type.name)
module_name = '_' + convert_camel_case_to_lower_case_underscore(interface_path.stem)
+type_annotations_import_statements.add(f'from {".".join(action.namespaced_type.namespaces)}.{module_name} import {action.goal.structure.namespaced_type.name}')
+type_annotations_import_statements.add(f'from {".".join(action.namespaced_type.namespaces)}.{module_name} import {action.result.structure.namespaced_type.name}')
+type_annotations_import_statements.add(f'from {".".join(action.namespaced_type.namespaces)}.{module_name} import {action.feedback.structure.namespaced_type.name}')
+
TEMPLATE(
'_msg.py.em',
package_name=package_name, interface_path=interface_path,
- message=action.goal, import_statements=import_statements)
+ message=action.goal, import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
TEMPLATE(
'_msg.py.em',
package_name=package_name, interface_path=interface_path,
- message=action.result, import_statements=import_statements)
+ message=action.result, import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
TEMPLATE(
'_msg.py.em',
package_name=package_name, interface_path=interface_path,
- message=action.feedback, import_statements=import_statements)
+ message=action.feedback, import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
TEMPLATE(
'_srv.py.em',
package_name=package_name, interface_path=interface_path,
- service=action.send_goal_service, import_statements=import_statements)
+ service=action.send_goal_service, import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
TEMPLATE(
'_srv.py.em',
package_name=package_name, interface_path=interface_path,
- service=action.get_result_service, import_statements=import_statements)
+ service=action.get_result_service, import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
TEMPLATE(
'_msg.py.em',
package_name=package_name, interface_path=interface_path,
- message=action.feedback_message, import_statements=import_statements)
+ message=action.feedback_message, import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
}@
class Metaclass_@(action.namespaced_type.name)(type):
"""Metaclass of action '@(action.namespaced_type.name)'."""
- _TYPE_SUPPORT = None
+ _TYPE_SUPPORT: typing.ClassVar[typing.Optional[PyCapsule]] = None
@@classmethod
- def __import_type_support__(cls):
+ def __import_type_support__(cls) -> None:
try:
from rosidl_generator_py import import_type_support
module = import_type_support('@(package_name)')
@@ -92,5 +102,6 @@ class @(action.namespaced_type.name)(metaclass=Metaclass_@(action.namespaced_typ
# The generic message for get the status of a goal.
from action_msgs.msg._goal_status_array import GoalStatusArray as GoalStatusMessage
- def __init__(self):
+ # type ignore below fixed in mypy 1.0+ see mypy#10342
+ def __init__(self) -> typing.NoReturn: # type: ignore
raise NotImplementedError('Action classes can not be instantiated')
diff --git a/rosidl_generator_py/resource/_idl.py.em b/rosidl_generator_py/resource/_idl.py.em
index 3f49224a..068d6f12 100644
--- a/rosidl_generator_py/resource/_idl.py.em
+++ b/rosidl_generator_py/resource/_idl.py.em
@@ -2,11 +2,15 @@
# with input from @(package_name):@(interface_path)
# generated code does not contain a copyright notice
+from __future__ import annotations
+
+import collections.abc
+from os import getenv
+import typing
+
# This is being done at the module level and not on the instance level to avoid looking
# for the same variable multiple times on each instance. This variable is not supposed to
# change during runtime so it makes sense to only look for it once.
-from os import getenv
-
ros_python_check_fields = getenv('ROS_PYTHON_CHECK_FIELDS', default='')
@
@#######################################################################
@@ -19,6 +23,7 @@ ros_python_check_fields = getenv('ROS_PYTHON_CHECK_FIELDS', default='')
@#######################################################################
@{
import_statements = set()
+type_annotations_import_statements = set()
}@
@
@#######################################################################
@@ -32,7 +37,8 @@ from rosidl_parser.definition import Message
TEMPLATE(
'_msg.py.em',
package_name=package_name, interface_path=interface_path, message=message,
- import_statements=import_statements)
+ import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
}@
@[end for]@
@
@@ -47,7 +53,8 @@ from rosidl_parser.definition import Service
TEMPLATE(
'_srv.py.em',
package_name=package_name, interface_path=interface_path, service=service,
- import_statements=import_statements)
+ import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
}@
@[end for]@
@
@@ -62,6 +69,7 @@ from rosidl_parser.definition import Action
TEMPLATE(
'_action.py.em',
package_name=package_name, interface_path=interface_path, action=action,
- import_statements=import_statements)
+ import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
}@
@[end for]@
diff --git a/rosidl_generator_py/resource/_msg.py.em b/rosidl_generator_py/resource/_msg.py.em
index 61640c52..ac7bf47c 100644
--- a/rosidl_generator_py/resource/_msg.py.em
+++ b/rosidl_generator_py/resource/_msg.py.em
@@ -4,6 +4,9 @@
from rosidl_pycommon import convert_camel_case_to_lower_case_underscore
from rosidl_generator_py.generate_py_impl import constant_value_to_py
from rosidl_generator_py.generate_py_impl import get_python_type
+from rosidl_generator_py.generate_py_impl import get_type_annotation_constant
+from rosidl_generator_py.generate_py_impl import get_type_annotation_default
+from rosidl_generator_py.generate_py_impl import get_setter_and_getter_type
from rosidl_generator_py.generate_py_impl import SPECIAL_NESTED_BASIC_TYPES
from rosidl_generator_py.generate_py_impl import value_to_py
from rosidl_parser.definition import AbstractGenericString
@@ -29,6 +32,53 @@ from rosidl_parser.definition import SIGNED_INTEGER_TYPES
from rosidl_parser.definition import UnboundedSequence
from rosidl_parser.definition import UNSIGNED_INTEGER_TYPES
}@
+@{
+import_type_checking = False
+type_annotations_setter: dict[str, str] = {}
+type_annotations_getter: dict[str, str] = {}
+type_imports: set[str] = set()
+
+# Types which always exist
+# Done in one multi-line string to preserve order
+type_imports.add(
+ """from ctypes import Structure
+
+ class PyCapsule(Structure):
+ pass # don't need to define the full structure""")
+for member in message.structure.members:
+ setter_type, getter_type = get_setter_and_getter_type(member, type_imports)
+ type_annotations_setter[member.name] = setter_type
+ type_annotations_getter[member.name] = getter_type
+
+custom_type_annotations = {}
+
+for constant in message.constants:
+ custom_type_annotations[constant.name] = get_type_annotation_constant(constant)
+
+default_type_annotations = {}
+
+for member in message.structure.members:
+ if member.has_annotation('default'):
+ default_type_annotations[member.name] = get_type_annotation_default(member)
+}@
+@{
+suffix = '__'.join(message.structure.namespaced_type.namespaces[1:]) + '__' + convert_camel_case_to_lower_case_underscore(message.structure.namespaced_type.name)
+type_annotations_import_statements_copy = type_annotations_import_statements.copy()
+}@
+
+
+if typing.TYPE_CHECKING:
+@[for type_import in type_imports]@
+@[if type_import not in type_annotations_import_statements]@
+ @(type_import)
+@{
+type_annotations_import_statements.add(type_import)
+}@
+@[end if]@
+@[end for]@
+@[if type_annotations_import_statements == type_annotations_import_statements_copy]@
+ pass
+@[end if]@
@#<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
@# Collect necessary import statements for all members
@{
@@ -91,22 +141,31 @@ for member in message.structure.members:
class Metaclass_@(message.structure.namespaced_type.name)(type):
"""Metaclass of message '@(message.structure.namespaced_type.name)'."""
- _CREATE_ROS_MESSAGE = None
- _CONVERT_FROM_PY = None
- _CONVERT_TO_PY = None
- _DESTROY_ROS_MESSAGE = None
- _TYPE_SUPPORT = None
+ _CREATE_ROS_MESSAGE: typing.ClassVar[typing.Optional[PyCapsule]] = None
+ _CONVERT_FROM_PY: typing.ClassVar[typing.Optional[PyCapsule]] = None
+ _CONVERT_TO_PY: typing.ClassVar[typing.Optional[PyCapsule]] = None
+ _DESTROY_ROS_MESSAGE: typing.ClassVar[typing.Optional[PyCapsule]] = None
+ _TYPE_SUPPORT: typing.ClassVar[typing.Optional[PyCapsule]] = None
- __constants = {
+ class @(message.structure.namespaced_type.name)Constants(typing.TypedDict):
+@[if not custom_type_annotations]@
+ pass
+@[else]@
+@[for constant in message.constants]@
+ @(constant.name): @(custom_type_annotations[constant.name])
+@[ end for]@
+@[end if]@
+
+ __constants: @(message.structure.namespaced_type.name)Constants = {
@[for constant in message.constants]@
'@(constant.name)': @constant_value_to_py(constant.type, constant.value),
@[end for]@
}
@@classmethod
- def __import_type_support__(cls):
+ def __import_type_support__(cls) -> None:
try:
- from rosidl_generator_py import import_type_support
+ from rosidl_generator_py import import_type_support # type: ignore[attr-defined]
module = import_type_support('@(package_name)')
except ImportError:
import logging
@@ -117,9 +176,6 @@ class Metaclass_@(message.structure.namespaced_type.name)(type):
'Failed to import needed modules for type support:\n' +
traceback.format_exc())
else:
-@{
-suffix = '__'.join(message.structure.namespaced_type.namespaces[1:]) + '__' + convert_camel_case_to_lower_case_underscore(message.structure.namespaced_type.name)
-}@
cls._CREATE_ROS_MESSAGE = module.create_ros_message_msg__@(suffix)
cls._CONVERT_FROM_PY = module.convert_from_py_msg__@(suffix)
cls._CONVERT_TO_PY = module.convert_to_py_msg__@(suffix)
@@ -151,18 +207,18 @@ for member in message.structure.members:
@[for typename in sorted(importable_typesupports)]@
from @('.'.join(typename[:-2])) import @(typename[-2])
- if @(typename[-1]).__class__._TYPE_SUPPORT is None:
- @(typename[-1]).__class__.__import_type_support__()
+ if @(typename[-1])._TYPE_SUPPORT is None:
+ @(typename[-1]).__import_type_support__()
@[end for]@
@@classmethod
- def __prepare__(cls, name, bases, **kwargs):
+ def __prepare__(metacls, name: str, bases: tuple[type[typing.Any], ...], /, **kwds: typing.Any) -> collections.abc.MutableMapping[str, object]:
# list constant names here so that they appear in the help text of
# the message class under "Data and other attributes defined here:"
# as well as populate each message instance
return {
@[for constant in message.constants]@
- '@(constant.name)': cls.__constants['@(constant.name)'],
+ '@(constant.name)': metacls.__constants['@(constant.name)'],
@[end for]@
@[for member in message.structure.members]@
@[ if member.has_annotation('default')]@
@@ -173,7 +229,7 @@ for member in message.structure.members:
@[for constant in message.constants]@
@@property
- def @(constant.name)(self):
+ def @(constant.name)(self) -> @(custom_type_annotations[constant.name]):
"""Message constant '@(constant.name)'."""
return Metaclass_@(message.structure.namespaced_type.name).__constants['@(constant.name)']
@[end for]@
@@ -181,7 +237,7 @@ for member in message.structure.members:
@[ if member.has_annotation('default')]@
@@property
- def @(member.name.upper())__DEFAULT(cls):
+ def @(member.name.upper())__DEFAULT(cls) -> @(default_type_annotations[member.name]):
"""Return default value for message field '@(member.name)'."""
return @(value_to_py(member.type, member.get_annotation_value('default')['value']))
@[ end if]@
@@ -212,7 +268,7 @@ class @(message.structure.namespaced_type.name)(metaclass=Metaclass_@(message.st
'_check_fields',
]
- _fields_and_field_types = {
+ _fields_and_field_types: dict[str, str] = {
@[for member in message.structure.members]@
@[ if len(message.structure.members) == 1 and member.name == EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME]@
@[ continue]@
@@ -257,7 +313,7 @@ string@
# This attribute is used to store an rosidl_parser.definition variable
# related to the data type of each of the components the message.
- SLOT_TYPES = (
+ SLOT_TYPES: tuple[rosidl_parser.definition.AbstractType, ...] = (
@[for member in message.structure.members]@
@[ if len(message.structure.members) == 1 and member.name == EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME]@
@[ continue]@
@@ -293,15 +349,25 @@ if isinstance(type_, AbstractNestedType):
@[end for]@
)
- def __init__(self, **kwargs):
- if 'check_fields' in kwargs:
- self._check_fields = kwargs['check_fields']
+ def __init__(self, *,
+@[for member in message.structure.members]@
+@[ if len(message.structure.members) == 1 and member.name == EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME]@
+@[ continue]@
+@[ end if]@
+@{
+import inspect
+import builtins
+noqa_string = ''
+if member.name in dict(inspect.getmembers(builtins)).keys():
+ noqa_string = ', A002'
+}@
+ @(member.name): typing.Optional[@(type_annotations_setter[member.name])] = None, # noqa: E501@(noqa_string)
+@[end for]@
+ check_fields: typing.Optional[bool] = None) -> None:
+ if check_fields is not None:
+ self._check_fields = check_fields
else:
self._check_fields = ros_python_check_fields == '1'
- if self._check_fields:
- assert all('_' + key in self.__slots__ for key in kwargs.keys()), \
- 'Invalid arguments passed to constructor: %s' % \
- ', '.join(sorted(k for k in kwargs.keys() if '_' + k not in self.__slots__))
@[for member in message.structure.members]@
@[ if len(message.structure.members) == 1 and member.name == EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME]@
@[ continue]@
@@ -312,8 +378,7 @@ if isinstance(type_, AbstractNestedType):
type_ = type_.value_type
}@
@[ if member.has_annotation('default')]@
- self.@(member.name) = kwargs.get(
- '@(member.name)', @(message.structure.namespaced_type.name).@(member.name.upper())__DEFAULT)
+ self.@(member.name) = @(member.name) if @(member.name) is not None else @(message.structure.namespaced_type.name).@(member.name.upper())__DEFAULT
@[ else]@
@[ if isinstance(type_, NamespacedType) and not isinstance(member.type, AbstractSequence)]@
@[ if (
@@ -328,50 +393,41 @@ if isinstance(type_, AbstractNestedType):
@[ end if]@
@[ if isinstance(member.type, Array)]@
@[ if isinstance(type_, BasicType) and type_.typename == 'octet']@
- self.@(member.name) = kwargs.get(
- '@(member.name)',
- [bytes([0]) for x in range(@(member.type.size))]
- )
+ self.@(member.name) = @(member.name) if @(member.name) is not None else [bytes([0]) for x in range(@(member.type.size))]
@[ elif isinstance(type_, BasicType) and type_.typename in CHARACTER_TYPES]@
- self.@(member.name) = kwargs.get(
- '@(member.name)',
- [chr(0) for x in range(@(member.type.size))]
- )
+ self.@(member.name) = @(member.name) if @(member.name) is not None else [chr(0) for x in range(@(member.type.size))]
@[ else]@
@[ if isinstance(member.type.value_type, BasicType) and member.type.value_type.typename in SPECIAL_NESTED_BASIC_TYPES]@
- if '@(member.name)' not in kwargs:
+ if @(member.name) is None:
self.@(member.name) = numpy.zeros(@(member.type.size), dtype=@(SPECIAL_NESTED_BASIC_TYPES[member.type.value_type.typename]['dtype']))
else:
- self.@(member.name) = numpy.array(kwargs.get('@(member.name)'), dtype=@(SPECIAL_NESTED_BASIC_TYPES[member.type.value_type.typename]['dtype']))
+ self.@(member.name) = numpy.array(@(member.name), dtype=@(SPECIAL_NESTED_BASIC_TYPES[member.type.value_type.typename]['dtype']))
assert self.@(member.name).shape == (@(member.type.size), )
@[ else]@
- self.@(member.name) = kwargs.get(
- '@(member.name)',
- [@(get_python_type(type_))() for x in range(@(member.type.size))]
- )
+ self.@(member.name) = @(member.name) if @(member.name) is not None else [@(get_python_type(type_))() for x in range(@(member.type.size))]
@[ end if]@
@[ end if]@
@[ elif isinstance(member.type, AbstractSequence)]@
@[ if isinstance(member.type.value_type, BasicType) and member.type.value_type.typename in SPECIAL_NESTED_BASIC_TYPES]@
- self.@(member.name) = array.array('@(SPECIAL_NESTED_BASIC_TYPES[member.type.value_type.typename]['type_code'])', kwargs.get('@(member.name)', []))
+ self.@(member.name) = @(member.name) if @(member.name) is not None else array.array('@(SPECIAL_NESTED_BASIC_TYPES[member.type.value_type.typename]['type_code'])', [])
@[ else]@
- self.@(member.name) = kwargs.get('@(member.name)', [])
+ self.@(member.name) = @(member.name) if @(member.name) is not None else []
@[ end if]@
@[ elif isinstance(type_, BasicType) and type_.typename == 'octet']@
- self.@(member.name) = kwargs.get('@(member.name)', bytes([0]))
+ self.@(member.name) = @(member.name) if @(member.name) is not None else bytes([0])
@[ elif isinstance(type_, BasicType) and type_.typename in CHARACTER_TYPES]@
- self.@(member.name) = kwargs.get('@(member.name)', chr(0))
+ self.@(member.name) = @(member.name) if @(member.name) is not None else chr(0)
@[ else]@
- self.@(member.name) = kwargs.get('@(member.name)', @(get_python_type(type_))())
+ self.@(member.name) = @(member.name) if @(member.name) is not None else @(get_python_type(type_))()
@[ end if]@
@[ end if]@
@[end for]@
- def __repr__(self):
+ def __repr__(self) -> str:
typename = self.__class__.__module__.split('.')
typename.pop()
typename.append(self.__class__.__name__)
- args = []
+ args: list[str] = []
for s, t in zip(self.get_fields_and_field_types().keys(), self.SLOT_TYPES):
field = getattr(self, s)
fieldstr = repr(field)
@@ -394,8 +450,8 @@ if isinstance(type_, AbstractNestedType):
args.append(s + '=' + fieldstr)
return '%s(%s)' % ('.'.join(typename), ', '.join(args))
- def __eq__(self, other):
- if not isinstance(other, self.__class__):
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, @(message.structure.namespaced_type.name)):
return False
@[for member in message.structure.members]@
@[ if len(message.structure.members) == 1 and member.name == EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME]@
@@ -411,7 +467,7 @@ if isinstance(type_, AbstractNestedType):
return True
@@classmethod
- def get_fields_and_field_types(cls):
+ def get_fields_and_field_types(cls) -> dict[str, str]:
from copy import copy
return copy(cls._fields_and_field_types)
@[for member in message.structure.members]@
@@ -424,19 +480,21 @@ type_ = member.type
if isinstance(type_, AbstractNestedType):
type_ = type_.value_type
-import inspect
-import builtins
noqa_string = ''
if member.name in dict(inspect.getmembers(builtins)).keys():
noqa_string = ' # noqa: A003'
+
+array_type_commment = ''
+if isinstance(member.type, (Array, AbstractSequence)):
+ array_type_commment = ' # typing.Annotated can be remove after mypy 1.16+ see mypy#3004'
}@
@@builtins.property@(noqa_string)
- def @(member.name)(self):@(noqa_string)
+ def @(member.name)(self) -> @(type_annotations_getter[member.name]):@(noqa_string)@(array_type_commment)
"""Message field '@(member.name)'."""
return self._@(member.name)
@@@(member.name).setter@(noqa_string)
- def @(member.name)(self, value):@(noqa_string)
+ def @(member.name)(self, value: @(type_annotations_setter[member.name])) -> None:@(noqa_string)
if self._check_fields:
@[ if isinstance(member.type, AbstractNestedType) and isinstance(member.type.value_type, BasicType) and member.type.value_type.typename in SPECIAL_NESTED_BASIC_TYPES]@
@[ if isinstance(member.type, Array)]@
@@ -477,8 +535,6 @@ if member.name in dict(inspect.getmembers(builtins)).keys():
from collections import UserString
@[ elif isinstance(type_, AbstractGenericString) and type_.has_maximum_size()]@
from collections import UserString
-@[ elif isinstance(type_, BasicType) and type_.typename == 'octet']@
- from collections.abc import ByteString
@[ elif isinstance(type_, BasicType) and type_.typename in CHARACTER_TYPES]@
from collections import UserString
@[ end if]@
@@ -551,7 +607,7 @@ bound = 1.7976931348623157e+308
isinstance(value, @(type_.name)), \
"The '@(member.name)' field must be a sub message of type '@(type_.name)'"
@[ elif isinstance(type_, BasicType) and type_.typename == 'octet']@
- (isinstance(value, (bytes, ByteString)) and
+ (isinstance(value, (bytes, bytearray, memoryview)) and
len(value) == 1), \
"The '@(member.name)' field must be of type 'bytes' or 'ByteString' with length 1"
@[ elif isinstance(type_, BasicType) and type_.typename == 'char']@
@@ -601,7 +657,8 @@ bound = 1.7976931348623157e+308
@[ if isinstance(member.type, Array)]@
self._@(member.name) = numpy.array(value, dtype=@(SPECIAL_NESTED_BASIC_TYPES[member.type.value_type.typename]['dtype']))
@[ elif isinstance(member.type, AbstractSequence)]@
- self._@(member.name) = array.array('@(SPECIAL_NESTED_BASIC_TYPES[member.type.value_type.typename]['type_code'])', value)
+ # type ignore below fixed in mypy 1.17+ see mypy#19421
+ self._@(member.name) = array.array('@(SPECIAL_NESTED_BASIC_TYPES[member.type.value_type.typename]['type_code'])', value) # type: ignore[assignment]
@[ end if]@
@[ else]@
self._@(member.name) = value
diff --git a/rosidl_generator_py/resource/_srv.py.em b/rosidl_generator_py/resource/_srv.py.em
index 24d4548f..ec55507f 100644
--- a/rosidl_generator_py/resource/_srv.py.em
+++ b/rosidl_generator_py/resource/_srv.py.em
@@ -5,28 +5,34 @@ from rosidl_pycommon import convert_camel_case_to_lower_case_underscore
service_name = '_' + convert_camel_case_to_lower_case_underscore(service.namespaced_type.name)
module_name = '_' + convert_camel_case_to_lower_case_underscore(interface_path.stem)
+type_annotations_import_statements.add(f'from {".".join(service.namespaced_type.namespaces)} import {service.request_message.structure.namespaced_type.name}')
+type_annotations_import_statements.add(f'from {".".join(service.namespaced_type.namespaces)} import {service.response_message.structure.namespaced_type.name}')
+
TEMPLATE(
'_msg.py.em',
package_name=package_name, interface_path=interface_path,
- message=service.request_message, import_statements=import_statements)
+ message=service.request_message, import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
TEMPLATE(
'_msg.py.em',
package_name=package_name, interface_path=interface_path,
- message=service.response_message, import_statements=import_statements)
+ message=service.response_message, import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
TEMPLATE(
'_msg.py.em',
package_name=package_name, interface_path=interface_path,
- message=service.event_message, import_statements=import_statements)
+ message=service.event_message, import_statements=import_statements,
+ type_annotations_import_statements=type_annotations_import_statements)
}@
class Metaclass_@(service.namespaced_type.name)(type):
"""Metaclass of service '@(service.namespaced_type.name)'."""
- _TYPE_SUPPORT = None
+ _TYPE_SUPPORT: typing.ClassVar[typing.Optional[PyCapsule]] = None
@@classmethod
- def __import_type_support__(cls):
+ def __import_type_support__(cls) -> None:
try:
from rosidl_generator_py import import_type_support
module = import_type_support('@(package_name)')
@@ -55,5 +61,6 @@ class @(service.namespaced_type.name)(metaclass=Metaclass_@(service.namespaced_t
from @('.'.join(service.namespaced_type.namespaces)).@(module_name) import @(service.response_message.structure.namespaced_type.name) as Response
from @('.'.join(service.namespaced_type.namespaces)).@(module_name) import @(service.event_message.structure.namespaced_type.name) as Event
- def __init__(self):
+ # type ignore below fixed in mypy 1.0+ see mypy#10342
+ def __init__(self) -> typing.NoReturn: # type: ignore
raise NotImplementedError('Service classes can not be instantiated')
diff --git a/rosidl_generator_py/rosidl_generator_py/__init__.py b/rosidl_generator_py/rosidl_generator_py/__init__.py
index de932669..ecf221fa 100644
--- a/rosidl_generator_py/rosidl_generator_py/__init__.py
+++ b/rosidl_generator_py/rosidl_generator_py/__init__.py
@@ -20,8 +20,7 @@
__all__ = ['import_type_support']
try:
- from .generate_py_impl import generate_py
- assert generate_py
+ from .generate_py_impl import generate_py # noqa: F401
__all__.append('generate_py')
except ImportError:
logger = logging.getLogger('rosidl_generator_py')
diff --git a/rosidl_generator_py/rosidl_generator_py/generate_py_impl.py b/rosidl_generator_py/rosidl_generator_py/generate_py_impl.py
index 0cbaaa10..43e15aeb 100644
--- a/rosidl_generator_py/rosidl_generator_py/generate_py_impl.py
+++ b/rosidl_generator_py/rosidl_generator_py/generate_py_impl.py
@@ -17,18 +17,25 @@
import os
import pathlib
import sys
+from typing import Union
from rosidl_parser.definition import AbstractGenericString
from rosidl_parser.definition import AbstractNestedType
from rosidl_parser.definition import AbstractSequence
+from rosidl_parser.definition import AbstractType
from rosidl_parser.definition import Action
+from rosidl_parser.definition import ACTION_FEEDBACK_SUFFIX
+from rosidl_parser.definition import ACTION_GOAL_SUFFIX
+from rosidl_parser.definition import ACTION_RESULT_SUFFIX
from rosidl_parser.definition import Array
from rosidl_parser.definition import BasicType
from rosidl_parser.definition import CHARACTER_TYPES
+from rosidl_parser.definition import Constant
from rosidl_parser.definition import FLOATING_POINT_TYPES
from rosidl_parser.definition import IdlContent
from rosidl_parser.definition import IdlLocator
from rosidl_parser.definition import INTEGER_TYPES
+from rosidl_parser.definition import Member
from rosidl_parser.definition import Message
from rosidl_parser.definition import NamespacedType
from rosidl_parser.definition import Service
@@ -130,36 +137,42 @@ def print_warning_if_reserved_keyword(member_name, interface_type, interface_nam
sorted((value, key) for (key, value) in module_names.items()):
f.write(
f'from {package_name}.{subfolder}.{module_name} import '
- f'{idl_stem} # noqa: F401\n')
+ f'{idl_stem} as {idl_stem} # noqa: F401\n')
if subfolder == 'srv':
f.write(
f'from {package_name}.{subfolder}.{module_name} import '
- f'{idl_stem}_Event # noqa: F401\n')
+ f'{idl_stem}_Event as {idl_stem}_Event # noqa: F401\n')
f.write(
f'from {package_name}.{subfolder}.{module_name} import '
- f'{idl_stem}_Request # noqa: F401\n')
+ f'{idl_stem}_Request as {idl_stem}_Request # noqa: F401\n')
f.write(
f'from {package_name}.{subfolder}.{module_name} import '
- f'{idl_stem}_Response # noqa: F401\n')
+ f'{idl_stem}_Response as {idl_stem}_Response # noqa: F401\n')
elif subfolder == 'action':
f.write(
f'from {package_name}.{subfolder}.{module_name} import '
- f'{idl_stem}_GetResult_Event # noqa: F401\n')
+ f'{idl_stem}_GetResult_Event as {idl_stem}_GetResult_Event'
+ ' # noqa: F401\n')
f.write(
f'from {package_name}.{subfolder}.{module_name} import '
- f'{idl_stem}_GetResult_Request # noqa: F401\n')
+ f'{idl_stem}_GetResult_Request as {idl_stem}_GetResult_Request'
+ ' # noqa: F401\n')
f.write(
f'from {package_name}.{subfolder}.{module_name} import '
- f'{idl_stem}_GetResult_Response # noqa: F401\n')
+ f'{idl_stem}_GetResult_Response as {idl_stem}_GetResult_Response'
+ ' # noqa: F401\n')
f.write(
f'from {package_name}.{subfolder}.{module_name} import '
- f'{idl_stem}_SendGoal_Event # noqa: F401\n')
+ f'{idl_stem}_SendGoal_Event as {idl_stem}_SendGoal_Event'
+ ' # noqa: F401\n')
f.write(
f'from {package_name}.{subfolder}.{module_name} import '
- f'{idl_stem}_SendGoal_Request # noqa: F401\n')
+ f'{idl_stem}_SendGoal_Request as {idl_stem}_SendGoal_Request'
+ ' # noqa: F401\n')
f.write(
f'from {package_name}.{subfolder}.{module_name} import '
- f'{idl_stem}_SendGoal_Response # noqa: F401\n')
+ f'{idl_stem}_SendGoal_Response as {idl_stem}_SendGoal_Response'
+ ' # noqa: F401\n')
# expand templates per available typesupport implementation
template_dir = args['template_dir']
@@ -192,6 +205,9 @@ def print_warning_if_reserved_keyword(member_name, interface_type, interface_nam
minimum_timestamp=latest_target_timestamp)
generated_files.append(generated_file)
+ # Generate py.typed to mark the generate files as having type support as according to PEP561.
+ (pathlib.Path(args['output_dir']) / 'py.typed').touch()
+
return generated_files
@@ -276,7 +292,7 @@ def constant_value_to_py(type_, value):
assert False, "unknown constant type '%s'" % type_
-def quoted_string(s):
+def quoted_string(s: str) -> str:
s = s.replace('\\', '\\\\')
# strings containing single quote but no double quotes can be wrapped in
# double quotes without escaping
@@ -288,7 +304,7 @@ def quoted_string(s):
return "'%s'" % s
-def get_python_type(type_):
+def get_python_type(type_: AbstractType) -> str:
if isinstance(type_, NamespacedType):
return type_.name
@@ -321,3 +337,119 @@ def get_python_type(type_):
return 'str'
assert False, "unknown type '%s'" % type_
+
+
+def get_type_annotation_constant(constant: Constant) -> str:
+ return get_type_annotation_constant_default(constant, constant.value)
+
+
+def get_type_annotation_default(member: Member) -> str:
+ default = member.get_annotation_value('default')
+ assert isinstance(default, dict), 'Only default types are dictionary'
+ return get_type_annotation_constant_default(member, default['value'])
+
+
+def get_type_annotation_constant_default(constant: Union[Constant, Member],
+ value: Union[str, int, float, bool]) -> str:
+ """From a constant/member return type annotations for constant/default values."""
+ type_ = constant.type
+
+ if isinstance(type_, AbstractNestedType):
+ type_ = type_.value_type
+
+ python_type = get_python_type(type_)
+
+ if (
+ isinstance(constant.type, AbstractNestedType) and isinstance(type_, BasicType) and
+ type_.typename in SPECIAL_NESTED_BASIC_TYPES
+ ):
+ if isinstance(constant.type, Array):
+ dtype = SPECIAL_NESTED_BASIC_TYPES[type_.typename]['dtype']
+ return f'numpy.typing.NDArray[{dtype}]'
+ elif isinstance(constant.type, AbstractSequence):
+ return f'array.array[{python_type}]'
+ elif isinstance(constant.type, AbstractNestedType):
+ return f'list[{python_type}]'
+ elif isinstance(type_, NamespacedType):
+ return python_type
+ elif isinstance(type_, float):
+ return 'float'
+ else:
+ if isinstance(value, str):
+ return f'typing.Literal[{quoted_string(value)}]'
+ elif isinstance(value, float):
+ return 'float'
+ elif isinstance(type_, BasicType) and type_.typename == 'octet':
+ return 'bytes'
+ else:
+ return f'typing.Literal[{value}]'
+
+ assert False, f"unknown type '{type_}'"
+
+
+def get_setter_and_getter_type(member: Member, type_imports: set[str]) -> tuple[str, str]:
+ """From a member return the setter and getter type annotations. Add needed type_imports."""
+ type_ = member.type
+
+ if isinstance(type_, AbstractNestedType):
+ type_ = type_.value_type
+
+ python_type = get_python_type(type_)
+
+ type_annotation = ''
+ type_annotations_getter = ''
+
+ if (
+ isinstance(member.type, AbstractNestedType) and isinstance(type_, BasicType) and
+ type_.typename in SPECIAL_NESTED_BASIC_TYPES
+ ):
+ if isinstance(member.type, Array):
+ type_imports.add('import numpy.typing')
+ dtype = SPECIAL_NESTED_BASIC_TYPES[type_.typename]['dtype']
+ type_annotation = f'numpy.typing.NDArray[{dtype}]'
+ elif isinstance(member.type, AbstractSequence):
+ type_annotation = f'array.array[{python_type}]'
+
+ # Using Annotated because of mypy#3004
+ type_annotations_getter = f'typing.Annotated[typing.Any, {type_annotation}]'
+
+ if isinstance(member.type, AbstractNestedType):
+ if type_annotation != '':
+ type_annotation = f'{type_annotation}, '
+ type_annotation = (f'typing.Union[{type_annotation}'
+ f'collections.abc.Sequence[{python_type}], '
+ f'collections.abc.Set[{python_type}], '
+ f'collections.UserList[{python_type}]]')
+
+ type_imports.add('import collections')
+ elif isinstance(member.type, AbstractGenericString) and member.type.has_maximum_size():
+ type_annotation = 'typing.Union[str, collections.UserString]'
+
+ type_imports.add('import collections')
+ elif isinstance(type_, BasicType) and type_.typename == 'char':
+ type_annotation = 'typing.Union[str, collection.UserString]'
+
+ type_imports.add('import collections')
+ elif isinstance(type_, BasicType) and type_.typename == 'octet':
+ type_annotation = 'typing.Union[bytes, collections.abc.ByteString]'
+ else:
+ type_annotation = python_type
+
+ if isinstance(type_, NamespacedType):
+ joined_type_namespaces = '.'.join(type_.namespaces)
+ if type_.name.endswith(ACTION_GOAL_SUFFIX) or type_.name.endswith(ACTION_RESULT_SUFFIX) \
+ or type_.name.endswith(ACTION_FEEDBACK_SUFFIX):
+
+ type_name_rsplit = type_.name.rsplit('_', 1)
+ lower_case_name = convert_camel_case_to_lower_case_underscore(type_name_rsplit[0])
+ type_imports.add(f'from {joined_type_namespaces}._{lower_case_name} '
+ f'import {type_.name}')
+ else:
+ type_imports.add(f'from {joined_type_namespaces} import {type_.name}')
+
+ type_annotations_setter = type_annotation
+
+ if type_annotations_getter == '':
+ type_annotations_getter = type_annotations_setter
+
+ return type_annotations_setter, type_annotations_getter
diff --git a/rosidl_generator_py/rosidl_generator_py/import_type_support_impl.py b/rosidl_generator_py/rosidl_generator_py/import_type_support_impl.py
index c16b0fb2..bea8a93e 100644
--- a/rosidl_generator_py/rosidl_generator_py/import_type_support_impl.py
+++ b/rosidl_generator_py/rosidl_generator_py/import_type_support_impl.py
@@ -13,6 +13,7 @@
# limitations under the License.
import importlib
+from types import ModuleType
from rpyutils import add_dll_directories_from_env
@@ -20,13 +21,13 @@
class UnsupportedTypeSupport(Exception):
"""Raised when typesupport couldn't be imported."""
- def __init__(self, pkg_name):
+ def __init__(self, pkg_name: str) -> None:
message = "Could not import 'rosidl_typesupport_c' for package '{0}'".format(pkg_name)
super(UnsupportedTypeSupport, self).__init__(message)
self.pkg_name = pkg_name
-def import_type_support(pkg_name):
+def import_type_support(pkg_name: str) -> ModuleType:
"""
Import the rosidl_typesupport_c module of a package.
diff --git a/rosidl_generator_py/rosidl_generator_py/py.typed b/rosidl_generator_py/rosidl_generator_py/py.typed
new file mode 100644
index 00000000..e69de29b
diff --git a/rosidl_generator_py/test/test_cli_extension.py b/rosidl_generator_py/test/test_cli_extension.py
index bcc5fbb6..238dee4c 100644
--- a/rosidl_generator_py/test/test_cli_extension.py
+++ b/rosidl_generator_py/test/test_cli_extension.py
@@ -15,12 +15,13 @@
import pathlib
from ament_index_python import get_resources
+from pytest import CaptureFixture
from rosidl_cli.command.generate.api import generate
PACKAGE_DIR = str(pathlib.Path(__file__).parent.parent)
-def test_cli_extension_for_smoke(tmp_path, capsys):
+def test_cli_extension_for_smoke(tmp_path: pathlib.Path, capsys: CaptureFixture[str]) -> None:
# NOTE(hidmic): pytest and empy do not play along,
# the latter expects some proxy will stay in sys.stdout
# and the former insists in overwriting it
diff --git a/rosidl_generator_py/test/test_interfaces.py b/rosidl_generator_py/test/test_interfaces.py
index ccaba590..a2ad7688 100644
--- a/rosidl_generator_py/test/test_interfaces.py
+++ b/rosidl_generator_py/test/test_interfaces.py
@@ -39,7 +39,7 @@
from rosidl_parser.definition import UnboundedString
-def test_basic_types():
+def test_basic_types() -> None:
msg = BasicTypes(check_fields=True)
# types
@@ -148,7 +148,7 @@ def test_basic_types():
assert math.isinf(msg.float64_value)
-def test_strings():
+def test_strings() -> None:
msg = Strings(check_fields=True)
# types
@@ -202,7 +202,7 @@ def test_strings():
setattr(msg, 'bounded_string_value_default1', 'a' * 23)
-def test_wstrings():
+def test_wstrings() -> None:
msg = WStrings(check_fields=True)
# types
@@ -216,7 +216,7 @@ def test_wstrings():
assert 'ハローワールド' == msg.wstring_value_default3
-def test_arrays_of_bounded_strings():
+def test_arrays_of_bounded_strings() -> None:
msg = StringArrays(check_fields=True)
array_valid_string_length = ['a' * 2, 'b' * 3, 'c' * 4]
array_too_long_strings = ['a' * 2, 'b' * 3, 'c' * 6]
@@ -237,7 +237,8 @@ def test_arrays_of_bounded_strings():
with pytest.raises(AssertionError):
setattr(msg, 'ub_string_ub_array_value', array_too_long_strings)
- array10strings = [] + [str(i) for i in range(10)]
+ # empty list + list[T] does not work until mypy 1.0+
+ array10strings: list[str] = [] + [str(i) for i in range(10)] # type: ignore
msg.ub_string_ub_array_value = array10strings
assert array10strings == msg.ub_string_ub_array_value
@@ -251,7 +252,7 @@ def test_arrays_of_bounded_strings():
with pytest.raises(AssertionError):
setattr(msg, 'ub_string_dynamic_array_value', array_too_long_strings)
- array10strings = [] + [str(i) for i in range(10)]
+ array10strings = [] + [str(i) for i in range(10)] # type: ignore
msg.ub_string_dynamic_array_value = array10strings
assert array10strings == msg.ub_string_dynamic_array_value
array10strings += ['gfg']
@@ -259,16 +260,16 @@ def test_arrays_of_bounded_strings():
assert array10strings == msg.ub_string_dynamic_array_value
-def test_constructor():
+def test_constructor() -> None:
msg = Strings(string_value='foo', check_fields=True)
assert 'foo' == msg.string_value
- with pytest.raises(AssertionError):
+ with pytest.raises(TypeError):
Strings(unknown_field='test', check_fields=True)
-def test_constants():
+def test_constants() -> None:
assert Constants.BOOL_CONST is True
assert bytes([50]) == Constants.BYTE_CONST
assert 100 == Constants.CHAR_CONST
@@ -288,7 +289,7 @@ def test_constants():
setattr(Constants, 'INT32_CONST', 42)
-def test_default_values():
+def test_default_values() -> None:
msg = Defaults(check_fields=True)
assert msg.bool_value is True
@@ -315,7 +316,7 @@ def test_default_values():
setattr(Defaults, 'INT32_VALUE__DEFAULT', 24)
-def test_arrays():
+def test_arrays() -> None:
msg = Arrays(check_fields=True)
# types
@@ -529,7 +530,7 @@ def test_arrays():
assert msg2 != msg3
-def test_bounded_sequences():
+def test_bounded_sequences() -> None:
msg = BoundedSequences(check_fields=True)
# types
@@ -753,7 +754,7 @@ def test_bounded_sequences():
setattr(msg, 'float64_values', [-float64_ieee_max_next, 0.0, float64_ieee_max_next])
-def test_unbounded_sequences():
+def test_unbounded_sequences() -> None:
msg = UnboundedSequences(check_fields=True)
# types
@@ -902,7 +903,7 @@ def test_unbounded_sequences():
setattr(msg, 'float64_values', [-float64_ieee_max_next, 0.0, float64_ieee_max_next])
-def test_slot_attributes():
+def test_slot_attributes() -> None:
msg = Nested(check_fields=True)
assert hasattr(msg, 'get_fields_and_field_types')
assert hasattr(msg, '__slots__')
@@ -917,7 +918,7 @@ def test_slot_attributes():
assert expected_slot_type == nested_slot_types_dict[expected_field]
-def test_string_slot_attributes():
+def test_string_slot_attributes() -> None:
msg = StringArrays(check_fields=True)
assert hasattr(msg, 'get_fields_and_field_types')
assert hasattr(msg, '__slots__')
@@ -943,7 +944,7 @@ def test_string_slot_attributes():
assert expected_slot_type == string_slot_types_dict[expected_field]
-def test_modifying_slot_fields_and_types():
+def test_modifying_slot_fields_and_types() -> None:
msg = StringArrays(check_fields=True)
assert hasattr(msg, 'get_fields_and_field_types')
string_slot_types_dict = getattr(msg, 'get_fields_and_field_types')()
@@ -952,7 +953,7 @@ def test_modifying_slot_fields_and_types():
assert len(getattr(msg, 'get_fields_and_field_types')()) == string_slot_types_dict_len
-def test_slot_types():
+def test_slot_types() -> None:
msg = Nested(check_fields=True)
assert hasattr(msg, 'SLOT_TYPES')
assert hasattr(msg, '__slots__')
@@ -962,7 +963,7 @@ def test_slot_types():
assert nested_slot_types[0].name == 'BasicTypes'
-def test_string_slot_types():
+def test_string_slot_types() -> None:
msg = StringArrays(check_fields=True)
assert hasattr(msg, 'SLOT_TYPES')
assert hasattr(msg, '__slots__')
@@ -990,7 +991,7 @@ def test_string_slot_types():
assert string_slot_types[4].size == 3
-def test_builtin_sequence_slot_attributes():
+def test_builtin_sequence_slot_attributes() -> None:
msg = BuiltinTypeSequencesIdl(check_fields=True)
assert hasattr(msg, 'get_fields_and_field_types')
assert hasattr(msg, '__slots__')
diff --git a/rosidl_generator_py/test/test_property.py b/rosidl_generator_py/test/test_property.py
index 75f3739f..6f389f0b 100644
--- a/rosidl_generator_py/test/test_property.py
+++ b/rosidl_generator_py/test/test_property.py
@@ -15,7 +15,7 @@
from rosidl_generator_py.msg import Property
-def test_msg_property():
+def test_msg_property() -> None:
msg = Property()
# types