roboto.domain.topics.decoded_message#

Module Contents#

class roboto.domain.topics.decoded_message.AttrGetter#

Bases: abc.ABC

Abstract base class for accessing attributes from decoded messages.

Provides a unified interface for extracting attributes from different types of decoded message data, whether they are dictionaries (JSON) or dynamically created classes (ROS1/ROS2). Used by message decoders to handle various message encoding formats in a consistent way.

static get_attribute(value, attribute)#
Abstractmethod:

Return type:

Any

Get the value of a specific attribute from the given value.

Parameters:
  • value – The decoded message value to access.

  • attribute – Name of the attribute to retrieve.

Returns:

The value of the specified attribute.

Return type:

Any

static get_attribute_names(value)#
Abstractmethod:

Return type:

collections.abc.Sequence[str]

Get the names of all attributes available in the given value.

Parameters:

value – The decoded message value to inspect.

Returns:

Sequence of attribute names available in the value.

Return type:

collections.abc.Sequence[str]

static has_sub_attributes(value)#
Abstractmethod:

Return type:

bool

Check if the given value has nested attributes that can be accessed.

Parameters:

value – The decoded message value to inspect.

Returns:

True if the value has nested attributes, False otherwise.

Return type:

bool

class roboto.domain.topics.decoded_message.ClassAttrGetter#

Bases: AttrGetter

Attribute getter for class-based decoded data.

Handles access to attributes from decoded messages that are represented as dynamically created classes with __slots__ at runtime. This includes ROS1, ROS2, and other message formats that use class-based representations.

static get_attribute(value, attribute)#

Get the value of a specific attribute from the given value.

Parameters:
  • value – The decoded message value to access.

  • attribute – Name of the attribute to retrieve.

Returns:

The value of the specified attribute.

static get_attribute_names(value)#

Get the names of all attributes available in the given value.

Parameters:

value – The decoded message value to inspect.

Returns:

Sequence of attribute names available in the value.

static has_sub_attributes(value)#

Check if the given value has nested attributes that can be accessed.

Parameters:

value – The decoded message value to inspect.

Returns:

True if the value has nested attributes, False otherwise.

class roboto.domain.topics.decoded_message.DecodedMessage(msg, message_paths)#

Facade for values returned from message decoders.

Provides a unified interface for working with decoded messages regardless of their original encoding format or source. Handles the conversion of decoded message data into dictionary format suitable for analysis and processing.

A decoded message may be one of several types: - A dictionary when the message data is encoded as JSON - A dynamically created class when the message data is encoded as ROS1, ROS2 (CDR), or other binary formats

This class abstracts away the differences between these formats and provides consistent access to message data through a dictionary interface, filtering the output based on the specified message paths.

Parameters:
static is_path_match(attrib, message_path)#

Check if an attribute path matches or is a parent of a message path.

Determines whether a given attribute path should be included when filtering message data based on the specified message paths.

Parameters:
  • attrib (str) – Attribute path to check (e.g., “pose.position”).

  • message_path (str) – Target message path (e.g., “pose.position.x”).

Returns:

True if the attribute matches or is a parent of the message path.

Return type:

bool

Examples

>>> DecodedMessage.is_path_match("pose", "pose.position.x")
True
>>> DecodedMessage.is_path_match("pose.position", "pose.position.x")
True
>>> DecodedMessage.is_path_match("pose.position.x", "pose.position.x")
True
>>> DecodedMessage.is_path_match("velocity", "pose.position.x")
False
to_dict()#

Convert the decoded message to a dictionary format.

Extracts and organizes message data into a dictionary structure, including only the attributes that match the specified message paths. Handles both flat and nested message structures.

Returns:

Dictionary containing the filtered message data with attribute names as keys.

Return type:

dict

Examples

>>> # Assuming message_paths include "pose.position.x" and "velocity"
>>> decoded_msg = DecodedMessage(ros_message, message_paths)
>>> data_dict = decoded_msg.to_dict()
>>> print(data_dict)
{'pose': {'position': {'x': 1.5}}, 'velocity': 2.0}
class roboto.domain.topics.decoded_message.DictAttrGetter#

Bases: AttrGetter

Attribute getter for JSON decoded data.

Handles access to attributes from JSON decoded messages, which are represented as standard Python dictionaries.

static get_attribute(value, attribute)#

Get the value of a specific attribute from the given value.

Parameters:
  • value – The decoded message value to access.

  • attribute – Name of the attribute to retrieve.

Returns:

The value of the specified attribute.

static get_attribute_names(value)#

Get the names of all attributes available in the given value.

Parameters:

value – The decoded message value to inspect.

Returns:

Sequence of attribute names available in the value.

static has_sub_attributes(value)#

Check if the given value has nested attributes that can be accessed.

Parameters:

value – The decoded message value to inspect.

Returns:

True if the value has nested attributes, False otherwise.

roboto.domain.topics.decoded_message.KNOWN_PATH_MAPPING: dict[str, dict[str, tuple[str, Ellipsis]]]#