roboto.domain.topics#

Submodules#

Package Contents#

class roboto.domain.topics.AddMessagePathRepresentationRequest(/, **data)#

Bases: BaseAddRepresentationRequest

Request to associate a message path with a representation.

Creates a link between a specific message path and a data representation, enabling efficient access to individual fields within topic data.

Parameters:

data (Any)

message_path_id: str#
model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class roboto.domain.topics.AddMessagePathRequest(/, **data)#

Bases: pydantic.BaseModel

Request to add a new message path to a topic.

Defines a new message path within a topic’s schema, specifying its data type, canonical type, and initial metadata. Used during topic creation or when extending an existing topic’s schema.

Parameters:

data (Any)

message_path#

Dot-delimited path to the attribute (e.g., “pose.position.x”).

data_type#

Native data type as it appears in the original data source (e.g., “float32”, “geometry_msgs/Pose”). Used for display purposes.

canonical_data_type#

Normalized Roboto data type that enables specialized platform features for maps, images, timestamps, and other data.

metadata#

Initial key-value pairs to associate with the message path.

canonical_data_type: roboto.domain.topics.record.CanonicalDataType#
data_type: str#
message_path: str#
metadata: dict[str, Any] = None#
model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class roboto.domain.topics.CanonicalDataType#

Bases: enum.Enum

Normalized data types used across different robotics frameworks.

Well-known and simplified data types that provide a common vocabulary for describing message path data types across different frameworks and technologies. These canonical types are primarily used for UI rendering decisions and cross-platform compatibility.

The canonical types abstract away framework-specific details while preserving the essential characteristics needed for data processing and visualization.

References

Example mappings:
  • float32 -> CanonicalDataType.Number

  • uint8[] -> CanonicalDataType.Array

  • sensor_msgs/Image -> CanonicalDataType.Image

  • geometry_msgs/Pose -> CanonicalDataType.Object

  • std_msgs/Header -> CanonicalDataType.Object

  • string -> CanonicalDataType.String

  • char -> CanonicalDataType.String

  • bool -> CanonicalDataType.Boolean

  • byte -> CanonicalDataType.Byte

Array = 'array'#

A sequence of values.

Boolean = 'boolean'#
Byte = 'byte'#
Image = 'image'#

Special purpose type for data that can be rendered as an image.

LatDegFloat = 'latdegfloat'#

Geographic point in degrees. E.g. 47.6749387 (used in ULog ver_data_format >= 2)

LatDegInt = 'latdegint'#

Geographic point in degrees, expressed as an integer. E.g. 317534036 (used in ULog ver_data_format < 2)

LonDegFloat = 'londegfloat'#

Geographic point in degrees. E.g. 9.1445274 (used in ULog ver_data_format >= 2)

LonDegInt = 'londegint'#

Geographic point in degrees, expressed as an integer. E.g. 1199146398 (used in ULog ver_data_format < 2)

Number = 'number'#
NumberArray = 'number_array'#
Object = 'object'#

A struct with attributes.

String = 'string'#
Timestamp = 'timestamp'#

Time elapsed since the Unix epoch, identifying a single instant on the time-line. Roboto clients will look for a "unit" metadata key on the MessagePath record, and will assume “ns” if none is found. If the timestamp is in a different unit, add the following metadata to the MessagePath record: { "unit": "s"|"ms"|"us"|"ns" } The unit must be a known value from TimeUnit.

Unknown = 'unknown'#

This is a fallback and should be used sparingly.

class roboto.domain.topics.CreateTopicRequest(/, **data)#

Bases: pydantic.BaseModel

Request to create a new topic in the Roboto platform.

Contains all the information needed to register a topic found within a source recording file, including its schema, temporal boundaries, and initial message paths.

Parameters:

data (Any)

association: roboto.association.Association#
end_time: int | None = None#
message_count: int | None = None#
message_paths: collections.abc.Sequence[AddMessagePathRequest] | None = None#
metadata: collections.abc.Mapping[str, Any] | None = None#
model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

schema_checksum: str | None = None#
schema_name: str | None = None#
start_time: int | None = None#
topic_name: str#
class roboto.domain.topics.DeleteMessagePathRequest(/, **data)#

Bases: pydantic.BaseModel

Request to delete a message path from a topic.

Removes a message path from a topic’s schema. This operation cannot be undone and will remove all associated data and metadata for the specified path.

Parameters:

data (Any)

message_path: str#

Message path name.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class roboto.domain.topics.MessagePath(record, roboto_client=None, topic_data_service=None)#

Represents a message path within a topic in the Roboto platform.

A message path defines a specific field or signal within a topic’s data schema, using dot notation to specify nested attributes. Message paths enable fine-grained access to individual data elements within time-series robotics data, supporting operations like statistical analysis, data filtering, and visualization.

Each message path has an associated data type (both native and canonical), metadata, and statistical information computed from the underlying data. Message paths are the fundamental building blocks for data analysis in Roboto, allowing users to work with specific signals or measurements from complex robotics data structures.

Message paths support temporal filtering, data export to various formats including pandas DataFrames, and integration with the broader Roboto analytics ecosystem. They provide efficient access to time-series data while maintaining the semantic structure of the original robotics messages.

The MessagePath class serves as the primary interface for accessing individual data signals within topics, providing methods for data retrieval, statistical analysis, and metadata management.

Parameters:
DELIMITER: ClassVar = '.'#
property canonical_data_type: roboto.domain.topics.record.CanonicalDataType#

Canonical Roboto data type corresponding to the native data type.

Return type:

roboto.domain.topics.record.CanonicalDataType

property count: PreComputedStat#

Number of data points available for this message path.

Return type:

PreComputedStat

property created: datetime.datetime#

Timestamp when this message path was created.

Return type:

datetime.datetime

property created_by: str#

Identifier of the user or system that created this message path.

Return type:

str

property data_type: str#

Native data type for this message path, e.g. ‘float32’

Return type:

str

classmethod from_id(message_path_id, roboto_client=None, topic_data_service=None)#

Retrieve a message path by its unique identifier.

Fetches a message path record from the Roboto platform using its unique ID. This is useful when you have a message path identifier from another operation.

Parameters:
Returns:

MessagePath instance representing the requested message path.

Raises:
Return type:

MessagePath

Examples

>>> message_path = MessagePath.from_id("mp_abc123")
>>> print(message_path.path)
'angular_velocity.x'
>>> print(message_path.canonical_data_type)
CanonicalDataType.Number
get_data(start_time=None, end_time=None, cache_dir=None)#

Return data for this specific message path.

Retrieves and yields data records containing only the values for this message path, with optional temporal filtering. This provides a focused view of a single signal or field within the broader topic data.

Parameters:
  • start_time (Optional[roboto.time.Time]) – Start time (inclusive) as nanoseconds since UNIX epoch or convertible to such by to_epoch_nanoseconds().

  • end_time (Optional[roboto.time.Time]) – End time (exclusive) as nanoseconds since UNIX epoch or convertible to such by to_epoch_nanoseconds().

  • cache_dir (Union[str, pathlib.Path, None]) – Directory where topic data will be downloaded if necessary. Defaults to DEFAULT_CACHE_DIR.

Yields:

Dictionary records containing the log_time and the value for this message path.

Return type:

collections.abc.Generator[dict[str, Any], None, None]

Notes

For each example below, assume the following is a sample datum record that can be found in this message path’s associated topic:

{
    "angular_velocity": {
        "x": <uint32>,
        "y": <uint32>,
        "z": <uint32>
    },
    "orientation": {
        "x": <uint32>,
        "y": <uint32>,
        "z": <uint32>,
        "w": <uint32>
    }
}

Examples

Print all data for a specific message path:

>>> topic = Topic.from_name_and_file("/imu/data", "file_abc123")
>>> angular_velocity_x = topic.get_message_path("angular_velocity.x")
>>> for record in angular_velocity_x.get_data():
...     print(f"Time: {record['log_time']}, Value: {record['angular_velocity']['x']}")

Get data within a time range:

>>> for record in angular_velocity_x.get_data(
...     start_time=1722870127699468923,
...     end_time=1722870127799468923
... ):
...     print(record)

Collect data into a dataframe (requires installing the roboto[analytics] extra):

>>> df = angular_velocity_x.get_data_as_df()
>>> import math
>>> assert math.isclose(angular_velocity_x.mean, df[angular_velocity_x.path].mean())
get_data_as_df(start_time=None, end_time=None, cache_dir=None)#

Return this message path’s data as a pandas DataFrame.

Retrieves message path data and converts it to a pandas DataFrame for analysis and visualization. The DataFrame is indexed by log time and contains a column for this message path’s values.

Parameters:
  • start_time (Optional[roboto.time.Time]) – Start time (inclusive) as nanoseconds since UNIX epoch or convertible to such by to_epoch_nanoseconds().

  • end_time (Optional[roboto.time.Time]) – End time (exclusive) as nanoseconds since UNIX epoch or convertible to such by to_epoch_nanoseconds().

  • cache_dir (Union[str, pathlib.Path, None]) – Directory where topic data will be downloaded if necessary. Defaults to DEFAULT_CACHE_DIR.

Returns:

pandas DataFrame containing the message path data, indexed by log time.

Raises:

ImportError – pandas is not installed. Install with roboto[analytics] extra.

Return type:

pandas.DataFrame

Notes

Requires installing this package using the roboto[analytics] extra.

Examples

>>> topic = Topic.from_name_and_file("/imu/data", "file_abc123")
>>> angular_velocity_x = topic.get_message_path("angular_velocity.x")
>>> df = angular_velocity_x.get_data_as_df()
>>> print(df.head())
                        angular_velocity.x
log_time
1722870127699468923                  0.1
1722870127699468924                  0.15
>>> print(f"Mean: {df[angular_velocity_x.path].mean()}")
Mean: 0.125
property max: PreComputedStat#

Maximum value observed for this message path.

Return type:

PreComputedStat

property mean: PreComputedStat#

Mean (average) value for this message path.

Return type:

PreComputedStat

property median: PreComputedStat#

Median value for this message path.

Return type:

PreComputedStat

property message_path_id: str#

Unique identifier for this message path.

Return type:

str

property metadata: dict[str, Any]#

Metadata dictionary associated with this message path.

Return type:

dict[str, Any]

property min: PreComputedStat#

Minimum value observed for this message path.

Return type:

PreComputedStat

property modified: datetime.datetime#

Timestamp when this message path was last modified.

Return type:

datetime.datetime

property modified_by: str#

Identifier of the user or system that last modified this message path.

Return type:

str

property org_id: str#

Organization ID that owns this message path.

Return type:

str

static parents(path)#

Get parent paths for a message path in dot notation.

Given a message path in dot notation, returns a list of its parent paths ordered from most specific to least specific.

Parameters:

path (str) – Message path in dot notation (e.g., “pose.pose.position.x”).

Returns:

List of parent paths in dot notation, ordered from most to least specific.

Return type:

list[str]

Examples

>>> path = "pose.pose.position.x"
>>> MessagePath.parents(path)
['pose.pose.position', 'pose.pose', 'pose']
>>> # Single level path has no parents
>>> MessagePath.parents("velocity")
[]
static parts(path)#

Split message path in dot notation into its constituent parts.

Splits a message path string into individual components, useful for programmatic manipulation of message path hierarchies.

Parameters:

path (str) – Message path in dot notation (e.g., “pose.pose.position.x”).

Returns:

List of path components in order from root to leaf.

Return type:

list[str]

Examples

>>> path = "pose.pose.position.x"
>>> MessagePath.parts(path)
['pose', 'pose', 'position', 'x']
>>> # Single component path
>>> MessagePath.parts("velocity")
['velocity']
property path: str#

Dot-delimited path to the attribute (e.g., ‘pose.position.x’).

Return type:

str

property record: roboto.domain.topics.record.MessagePathRecord#

Underlying MessagePathRecord for this message path.

Return type:

roboto.domain.topics.record.MessagePathRecord

to_association()#

Convert this message path to an Association object.

Creates an Association object that can be used to reference this message path in other parts of the Roboto platform.

Returns:

Association object representing this message path.

Return type:

roboto.association.Association

Examples

>>> message_path = MessagePath.from_id("mp_abc123")
>>> association = message_path.to_association()
>>> print(association.association_type)
AssociationType.MessagePath
>>> print(association.association_id)
mp_abc123
property topic_id: str#

Unique identifier of the topic containing this message path.

Return type:

str

class roboto.domain.topics.MessagePathChangeset(/, **data)#

Bases: pydantic.BaseModel

Changeset for batch operations on topic message paths.

Defines a collection of add, delete, and update operations to be applied to a topic’s message paths in a single atomic operation. Useful for making multiple schema changes efficiently.

Parameters:

data (Any)

check_replace_all_correctness()#
Return type:

MessagePathChangeset

classmethod from_replacement_message_paths(message_paths)#

Create a changeset that replaces all existing message paths.

Creates a changeset that will replace all existing message paths on a topic with the provided set of message paths. This is useful for completely redefining a topic’s schema.

Parameters:

message_paths (collections.abc.Sequence[AddMessagePathRequest]) – Sequence of message path requests to replace existing paths.

Returns:

MessagePathChangeset configured to replace all existing message paths.

Return type:

MessagePathChangeset

Examples

>>> from roboto.domain.topics import AddMessagePathRequest, CanonicalDataType
>>> new_paths = [
...     AddMessagePathRequest(
...         message_path="velocity.x",
...         data_type="float32",
...         canonical_data_type=CanonicalDataType.Number
...     )
... ]
>>> changeset = MessagePathChangeset.from_replacement_message_paths(new_paths)
has_changes()#

Check whether the changeset contains any actual changes.

Returns:

True if the changeset contains operations that would modify the topic’s message paths.

Return type:

bool

message_paths_to_add: collections.abc.Sequence[AddMessagePathRequest] | None = None#

Message paths to add to a topic.

message_paths_to_delete: collections.abc.Sequence[DeleteMessagePathRequest] | None = None#

Message paths to delete from a topic.

message_paths_to_update: collections.abc.Sequence[UpdateMessagePathRequest] | None = None#

Message paths to update on a topic.

replace_all: bool = False#

Flag indicating whether this changeset should replace all message paths on a topic.

It assumes that the replacement message paths will be provided via message_paths_to_add. Rather than setting this flag directly, use appropriate class methods such as from_replacement_message_paths.

class roboto.domain.topics.MessagePathMetadataWellKnown#

Bases: str, enum.Enum

Well-known metadata key names (with well-known semantics) that may be set in metadata.

These are most often set by Roboto’s first-party ingestion actions and used by Roboto clients.

ColumnName = 'column_name'#

The original name or path to this field in the source data schema. May differ from message_path if character substitutions were applied to conform to naming requirements.

Notes

  • Use of this metadata field is soft-deprecated as of SDK v0.24.0.

  • Prefer use of source_path and path_in_schema instead. While those attributes are currently derived from metadata stored in this key, first-class support for specifying those attributes will be added to MessagePathRecord creation/update APIs in an upcoming SDK release.

Unit = 'unit'#

Unit of a field. E.g., ‘ns’ for a timestamp. If provided, must match a known, supported unit from TimeUnit.

class roboto.domain.topics.MessagePathRecord(/, **data)#

Bases: pydantic.BaseModel

Record representing a message path within a topic.

Defines a specific field or signal within a topic’s data schema, including its data type, metadata, and statistical information. Message paths use dot notation to specify nested attributes within complex message structures.

Message paths are the fundamental units for accessing individual data elements within time-series robotics data, enabling fine-grained analysis and visualization of specific signals or measurements.

Parameters:

data (Any)

canonical_data_type: CanonicalDataType#

Normalized data type, used primarily internally by the Roboto Platform.

created: datetime.datetime#
created_by: str#
data_type: str#

‘Native’/framework-specific data type of the attribute at this path. E.g. “float32”, “uint8[]”, “geometry_msgs/Pose”, “string”.

message_path: str#

Dot-delimited path to the attribute within the datum record.

message_path_id: str#
metadata: collections.abc.Mapping[str, Any] = None#

Key-value pairs to associate with this metadata for discovery and search, e.g. { ‘min’: ‘0.71’, ‘max’: ‘1.77 }

modified: datetime.datetime#
modified_by: str#
org_id: str#

This message path’s organization ID, which is the organization ID of the containing topic.

parents(delimiter='.')#

Logical message path ancestors of this path.

Examples

Given a deeply nested field root.sub_obj_1.sub_obj_2.leaf_field:

>>> field = "root.sub_obj_1.sub_obj_2.leaf_field"
>>> record = MessagePathRecord(message_path=field) # other fields omitted for brevity
>>> print(record.parents())
['root.sub_obj_1.sub_obj_2', 'root.sub_obj_1', 'root']
Parameters:

delimiter (str)

Return type:

list[str]

path_in_schema: list[str]#

List of path components representing the field’s location in the original data schema. Unlike message_path, which must conform to Roboto-specific naming requirements and assumes dots separated path parts imply nested data, this preserves the exact path from the source data for accurate attribute access. This is expected to be the split representation of source_path.

representations: collections.abc.MutableSequence[RepresentationRecord] = None#

Zero to many Representations of this MessagePath.

source_path: str#

The original name of this field in the source data schema. May differ from message_path if character substitutions were applied to conform to naming requirements.

This is the preferred field to use when specifying message_path_include or message_path_exclude to the get_data or get_data_as_df methods of Topic and Event.

topic_id: str#
class roboto.domain.topics.MessagePathRepresentationMapping(/, **data)#

Bases: pydantic.BaseModel

Mapping between message paths and their data representation.

Associates a set of message paths with a specific representation that contains their data. This mapping is used to efficiently locate and access data for specific message paths within topic representations.

Parameters:

data (Any)

message_paths: collections.abc.MutableSequence[roboto.domain.topics.record.MessagePathRecord]#
representation: roboto.domain.topics.record.RepresentationRecord#
class roboto.domain.topics.MessagePathStatistic#

Bases: enum.Enum

Statistics computed by Roboto in our standard ingestion actions.

Count = 'count'#
Max = 'max'#
Mean = 'mean'#
Median = 'median'#
Min = 'min'#
class roboto.domain.topics.RepresentationRecord(/, **data)#

Bases: pydantic.BaseModel

Record representing a data representation for topic content.

A representation is a pointer to processed topic data stored in a specific format and location. Representations enable efficient access to topic data by providing multiple storage formats optimized for different use cases.

Most message paths within a topic point to the same representation (e.g., an MCAP or Parquet file containing all topic data). However, some message paths may have multiple representations for analytics or preview formats.

Representations are versioned and associated with specific files or storage locations through the association field.

Parameters:

data (Any)

association: roboto.association.Association#

Identifier and entity type with which this Representation is associated. E.g., a file, a database.

created: datetime.datetime#
modified: datetime.datetime#
representation_id: str#
storage_format: RepresentationStorageFormat#
topic_id: str#
version: int#
class roboto.domain.topics.RepresentationStorageFormat#

Bases: enum.Enum

Supported storage formats for topic data representations.

Defines the available formats for storing and accessing topic data within the Roboto platform. Each format has different characteristics and use cases.

MCAP = 'mcap'#

MCAP format - optimized for robotics time-series data with efficient random access.

PARQUET = 'parquet'#

Parquet format - columnar storage optimized for analytics and large-scale data processing.

class roboto.domain.topics.SetDefaultRepresentationRequest(/, **data)#

Bases: BaseAddRepresentationRequest

Request to set the default representation for a topic.

Designates a specific representation as the default for accessing topic data. The default representation is used when no specific representation is requested for data access operations.

Parameters:

data (Any)

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class roboto.domain.topics.Topic(record, roboto_client=None, topic_data_service=None)#

Represents a topic within the Roboto platform.

A topic is a sequence of structured time-series data linked to a source file, typically containing sensor readings, robot state information, or other timestamped data streams. Topics are fundamental building blocks for data analysis in robotics, providing organized access to time-synchronized data from various sources like ROS bags, MCAP files, or other structured data formats.

Each topic follows a defined schema where message paths represent the individual fields or signals within that schema. Topics enable efficient querying, filtering, and analysis of time-series data, supporting operations like temporal slicing, field selection, and data export to various formats including pandas DataFrames.

Topics are associated with files and inherit access permissions from their parent dataset. They provide the primary interface for accessing ingested robotics data in the Roboto platform, supporting both programmatic access through the SDK and visualization in the web interface.

The Topic class serves as the main interface for topic operations in the Roboto SDK, providing methods for data retrieval, message path management, metadata operations, and schema management.

Parameters:
add_message_path(message_path, data_type, canonical_data_type, metadata=None)#

Add a new message path to this topic.

Creates a new message path within this topic, defining a specific field or signal that can be extracted from the topic’s data. Message paths use dot notation to specify nested attributes within the topic’s schema.

Parameters:
  • message_path (str) – Dot-delimited path to the attribute (e.g., “pose.position.x”).

  • data_type (str) – Native data type of the attribute as it appears in the original data source (e.g., “float32”, “uint8[]”, “geometry_msgs/Pose”). Used primarily for display purposes and should match the robot’s runtime language or schema definitions.

  • canonical_data_type (roboto.domain.topics.record.CanonicalDataType) – Normalized Roboto data type that enables specialized platform features for maps, images, timestamps, and other data with special interpretations.

  • metadata (Optional[dict[str, Any]]) – Additional metadata to associate with the message path.

Returns:

MessagePathRecord representing the newly created message path.

Raises:
Return type:

roboto.domain.topics.record.MessagePathRecord

Examples

>>> from roboto.domain.topics import CanonicalDataType
>>> topic = Topic.from_id("topic_xyz789")
>>> message_path = topic.add_message_path(
...     message_path="pose.position.x",
...     data_type="float64",
...     canonical_data_type=CanonicalDataType.Number,
...     metadata={"unit": "meters"}
... )
>>> print(message_path.message_path)
pose.position.x
add_message_path_representation(message_path_id, association, storage_format, version)#

Add a representation for a specific message path.

Associates a message path with a data representation, enabling efficient access to specific fields within the topic data. Representations can be in different storage formats like MCAP or Parquet.

Parameters:
Returns:

RepresentationRecord representing the newly created representation.

Raises:
Return type:

roboto.domain.topics.record.RepresentationRecord

Examples

>>> from roboto.association import Association
>>> from roboto.domain.topics import RepresentationStorageFormat
>>> topic = Topic.from_id("topic_xyz789")
>>> representation = topic.add_message_path_representation(
...     message_path_id="mp_123",
...     association=Association.file("file_repr_456"),
...     storage_format=RepresentationStorageFormat.MCAP,
...     version=1
... )
>>> print(representation.representation_id)
repr_789
property association: roboto.association.Association#

Association linking this topic to its source entity (typically a file).

Return type:

roboto.association.Association

classmethod create(file_id, topic_name, end_time=None, message_count=None, metadata=None, schema_checksum=None, schema_name=None, start_time=None, message_paths=None, caller_org_id=None, roboto_client=None)#

Create a new topic associated with a file.

Creates a new topic record in the Roboto platform, associating it with the specified file and defining its schema and temporal boundaries. This method is typically used during data ingestion to register topics found in robotics data files.

Note

On successful creation, the topic will be a metadata-only container and will not be visualizable or usable via data access methods like get_data_as_df() until one or more representations have been registered. This is typically handled automatically by ingestion actions, but power users may need to manage representations manually.

Parameters:
  • file_id (str) – Unique identifier of the file this topic is associated with.

  • topic_name (str) – Name of the topic (e.g., “/camera/image”, “/imu/data”).

  • end_time (Optional[int]) – End time of the topic data in nanoseconds since UNIX epoch.

  • message_count (Optional[int]) – Total number of messages in this topic.

  • metadata (Optional[collections.abc.Mapping[str, Any]]) – Additional metadata to associate with the topic.

  • schema_checksum (Optional[str]) – Checksum of the topic’s message schema for validation.

  • schema_name (Optional[str]) – Name of the message schema (e.g., “sensor_msgs/Image”).

  • start_time (Optional[int]) – Start time of the topic data in nanoseconds since UNIX epoch.

  • message_paths (Optional[collections.abc.Sequence[roboto.domain.topics.operations.AddMessagePathRequest]]) – Message paths to create along with the topic.

  • caller_org_id (Optional[str]) – Organization ID to create the topic in. Required for multi-org users.

  • roboto_client (Optional[roboto.http.RobotoClient]) – HTTP client for API communication. If None, uses the default client.

Returns:

Topic instance representing the newly created topic.

Raises:
Return type:

Topic

Examples

Create a basic topic for camera data:

>>> topic = Topic.create(
...     file_id="file_abc123",
...     topic_name="/camera/image",
...     schema_name="sensor_msgs/Image",
...     start_time=1722870127699468923,
...     end_time=1722870127799468923,
...     message_count=100
... )
>>> print(topic.topic_id)
topic_xyz789

Create a topic with metadata and message paths:

>>> from roboto.domain.topics import AddMessagePathRequest, CanonicalDataType
>>> message_paths = [
...     AddMessagePathRequest(
...         message_path="header.stamp.sec",
...         data_type="uint32",
...         canonical_data_type=CanonicalDataType.Timestamp
...     )
... ]
>>> topic = Topic.create(
...     file_id="file_abc123",
...     topic_name="/imu/data",
...     schema_name="sensor_msgs/Imu",
...     metadata={"sensor_type": "IMU", "frequency": 100},
...     message_paths=message_paths
... )
property created: datetime.datetime#

Timestamp when this topic was created in the Roboto platform.

Return type:

datetime.datetime

property created_by: str#

Identifier of the user or system that created this topic.

Return type:

str

property dataset_id: str | None#

Unique identifier of the dataset containing this topic, if applicable.

Return type:

Optional[str]

property default_representation: roboto.domain.topics.record.RepresentationRecord | None#

Default representation used for accessing this topic’s data.

Return type:

Optional[roboto.domain.topics.record.RepresentationRecord]

delete()#

Delete this topic from the Roboto platform.

Permanently removes this topic and all its associated message paths and representations from the platform. This operation cannot be undone.

Raises:
Return type:

None

Examples

>>> topic = Topic.from_id("topic_xyz789")
>>> topic.delete()
# Topic and all its data are now permanently deleted
property end_time: int | None#

End time of the topic data in nanoseconds since UNIX epoch.

Return type:

Optional[int]

property file_id: str | None#

Unique identifier of the file containing this topic, if applicable.

Return type:

Optional[str]

classmethod from_id(topic_id, roboto_client=None)#

Retrieve a topic by its unique identifier.

Fetches a topic record from the Roboto platform using its unique topic ID. This is the most direct way to access a specific topic when you know its identifier.

Parameters:
  • topic_id (str) – Unique identifier for the topic.

  • roboto_client (Optional[roboto.http.RobotoClient]) – HTTP client for API communication. If None, uses the default client.

Returns:

Topic instance representing the requested topic.

Raises:
Return type:

Topic

Examples

>>> topic = Topic.from_id("topic_xyz789")
>>> print(topic.name)
'/camera/image'
>>> print(topic.message_count)
100
classmethod from_name_and_file(topic_name, file_id, owner_org_id=None, roboto_client=None)#

Retrieve a topic by its name and associated file.

Fetches a topic record using its name and the file it’s associated with. This is useful when you know the topic name (e.g., “/camera/image”) and the file containing the topic data.

Parameters:
  • topic_name (str) – Name of the topic to retrieve.

  • file_id (str) – Unique identifier of the file containing the topic.

  • owner_org_id (Optional[str]) – Organization ID to scope the search. If None, uses caller’s org.

  • roboto_client (Optional[roboto.http.RobotoClient]) – HTTP client for API communication. If None, uses the default client.

Returns:

Topic instance representing the requested topic.

Raises:
Return type:

Topic

Examples

>>> topic = Topic.from_name_and_file(
...     topic_name="/camera/image",
...     file_id="file_abc123"
... )
>>> print(topic.topic_id)
topic_xyz789
>>> print(len(topic.message_paths))
5
classmethod get_by_dataset(dataset_id, roboto_client=None)#

List all topics associated with files in a dataset.

Retrieves all topics from files within the specified dataset. If multiple files contain topics with the same name (e.g., chunked files with the same schema), they are returned as separate topic objects.

Parameters:
  • dataset_id (str) – Unique identifier of the dataset to search.

  • roboto_client (Optional[roboto.http.RobotoClient]) – HTTP client for API communication. If None, uses the default client.

Yields:

Topic instances associated with files in the dataset.

Raises:
Return type:

collections.abc.Generator[Topic, None, None]

Examples

>>> for topic in Topic.get_by_dataset("ds_abc123"):
...     print(f"Topic: {topic.name} (File: {topic.file_id})")
Topic: /camera/image (File: file_001)
Topic: /imu/data (File: file_001)
Topic: /camera/image (File: file_002)
Topic: /imu/data (File: file_002)
>>> # Count topics by name
>>> from collections import Counter
>>> topic_names = [topic.name for topic in Topic.get_by_dataset("ds_abc123")]
>>> print(Counter(topic_names))
Counter({'/camera/image': 2, '/imu/data': 2})
classmethod get_by_file(file_id, owner_org_id=None, roboto_client=None)#

List all topics associated with a specific file.

Retrieves all topics contained within the specified file. This is useful for exploring the structure of robotics data files and understanding what data streams are available.

Parameters:
  • file_id (str) – Unique identifier of the file to search.

  • owner_org_id (Optional[str]) – Organization ID to scope the search. If None, uses caller’s org.

  • roboto_client (Optional[roboto.http.RobotoClient]) – HTTP client for API communication. If None, uses the default client.

Yields:

Topic instances associated with the specified file.

Raises:
Return type:

collections.abc.Generator[Topic, None, None]

Examples

>>> for topic in Topic.get_by_file("file_abc123"):
...     print(f"Topic: {topic.name} ({topic.message_count} messages)")
Topic: /camera/image (150 messages)
Topic: /imu/data (1500 messages)
Topic: /gps/fix (50 messages)
>>> # Get topics with specific schema
>>> camera_topics = [
...     topic for topic in Topic.get_by_file("file_abc123")
...     if "camera" in topic.name
... ]
get_data(message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, cache_dir=None)#

Return this topic’s underlying data.

Retrieves and yields data records from this topic, with optional filtering by message paths and time range. Each yielded datum is a dictionary that matches this topic’s schema.

Parameters:
  • message_paths_include (Optional[collections.abc.Iterable[str]]) – Dot notation paths that match attributes of individual data records to include. If None, all paths are included.

  • message_paths_exclude (Optional[collections.abc.Iterable[str]]) – Dot notation paths that match attributes of individual data records to exclude. If None, no paths are excluded.

  • start_time (Optional[roboto.time.Time]) – Start time (inclusive) as nanoseconds since UNIX epoch or convertible to such by to_epoch_nanoseconds().

  • end_time (Optional[roboto.time.Time]) – End time (exclusive) as nanoseconds since UNIX epoch or convertible to such by to_epoch_nanoseconds().

  • cache_dir (Union[str, pathlib.Path, None]) – Directory where topic data will be downloaded if necessary. Defaults to DEFAULT_CACHE_DIR.

Yields:

Dictionary records that match this topic’s schema, filtered according to the parameters.

Return type:

collections.abc.Generator[dict[str, Any], None, None]

Notes

For each example below, assume the following is a sample datum record that can be found in this topic:

{
“angular_velocity”: {

“x”: <uint32>, “y”: <uint32>, “z”: <uint32>

}, “orientation”: {

“x”: <uint32>, “y”: <uint32>, “z”: <uint32>, “w”: <uint32>

}

}

Examples

Print all data to stdout:

>>> topic = Topic.from_name_and_file(...)
>>> for record in topic.get_data():
...     print(record)

Only include the “angular_velocity” sub-object, but filter out its “y” property:

>>> topic = Topic.from_name_and_file(...)
>>> for record in topic.get_data(
...     message_paths_include=["angular_velocity"],
...     message_paths_exclude=["angular_velocity.y"],
... ):
...     print(record)

Only include data between two timestamps:

>>> topic = Topic.from_name_and_file(...)
>>> for record in topic.get_data(
...     start_time=1722870127699468923,
...     end_time=1722870127699468924,
... ):
...     print(record)

Collect all topic data into a dataframe (requires installing the roboto[analytics] extra):

>>> topic = Topic.from_name_and_file(...)
>>> df = topic.get_data_as_df()
get_data_as_df(message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, cache_dir=None)#

Return this topic’s underlying data as a pandas DataFrame.

Retrieves topic data and converts it to a pandas DataFrame for analysis and visualization. The DataFrame is indexed by log time and contains columns for each message path in the topic data.

Parameters:
  • message_paths_include (Optional[collections.abc.Iterable[str]]) – Dot notation paths that match attributes of individual data records to include. If None, all paths are included.

  • message_paths_exclude (Optional[collections.abc.Iterable[str]]) – Dot notation paths that match attributes of individual data records to exclude. If None, no paths are excluded.

  • start_time (Optional[roboto.time.Time]) – Start time (inclusive) as nanoseconds since UNIX epoch or convertible to such by to_epoch_nanoseconds().

  • end_time (Optional[roboto.time.Time]) – End time (exclusive) as nanoseconds since UNIX epoch or convertible to such by to_epoch_nanoseconds().

  • cache_dir (Union[str, pathlib.Path, None]) – Directory where topic data will be downloaded if necessary. Defaults to DEFAULT_CACHE_DIR.

Returns:

pandas DataFrame containing the topic data, indexed by log time.

Raises:

ImportError – pandas is not installed. Install with roboto[analytics] extra.

Return type:

pandas.DataFrame

Notes

Requires installing this package using the roboto[analytics] extra.

Examples

>>> topic = Topic.from_name_and_file("/imu/data", "file_abc123")
>>> df = topic.get_data_as_df()
>>> print(df.head())
                        angular_velocity.x  angular_velocity.y  ...
log_time
1722870127699468923                  0.1                 0.2  ...
1722870127699468924                  0.15                0.25 ...
>>> # Filter specific message paths
>>> df_filtered = topic.get_data_as_df(
...     message_paths_include=["angular_velocity"],
...     message_paths_exclude=["angular_velocity.z"]
... )
>>> print(df_filtered.columns.tolist())
['angular_velocity.x', 'angular_velocity.y']
get_message_path(message_path)#

Get a specific message path from this topic.

Retrieves a MessagePath object for the specified path, enabling access to individual fields or signals within the topic’s data schema.

Parameters:

message_path (str) – Dot-delimited path to the desired attribute (e.g., “pose.position.x”).

Returns:

MessagePath instance for the specified path.

Raises:

ValueError – No message path with the given name exists in this topic.

Return type:

roboto.domain.topics.message_path.MessagePath

Examples

>>> topic = Topic.from_name_and_file("/imu/data", "file_abc123")
>>> angular_vel_x = topic.get_message_path("angular_velocity.x")
>>> print(angular_vel_x.canonical_data_type)
CanonicalDataType.Number
>>> # Access message path statistics
>>> print(angular_vel_x.mean)
0.125
>>> print(angular_vel_x.std_dev)
0.05
property message_count: int | None#

Total number of messages in this topic.

Return type:

Optional[int]

property message_paths: collections.abc.Sequence[roboto.domain.topics.record.MessagePathRecord]#

Sequence of message path records defining the topic’s schema.

Return type:

collections.abc.Sequence[roboto.domain.topics.record.MessagePathRecord]

property metadata: dict[str, Any]#

Metadata dictionary associated with this topic.

Return type:

dict[str, Any]

property modified: datetime.datetime#

Timestamp when this topic was last modified.

Return type:

datetime.datetime

property modified_by: str#

Identifier of the user or system that last modified this topic.

Return type:

str

property name: str#

Name of the topic (e.g., ‘/camera/image’, ‘/imu/data’).

Return type:

str

property org_id: str#

Organization ID that owns this topic.

Return type:

str

property record: roboto.domain.topics.record.TopicRecord#

Topic representation in the Roboto database.

This property is on the path to deprecation. All TopicRecord attributes are accessible directly using a Topic instance.

Return type:

roboto.domain.topics.record.TopicRecord

refresh()#

Refresh this topic instance with the latest data from the platform.

Fetches the current state of the topic from the Roboto platform and updates this instance’s data. Useful when the topic may have been modified by other processes or users.

Examples

>>> topic = Topic.from_id("topic_xyz789")
>>> # Topic may have been updated by another process
>>> topic.refresh()
>>> print(f"Current message count: {topic.message_count}")
Return type:

None

property schema_checksum: str | None#

Checksum of the topic’s message schema for validation.

Return type:

Optional[str]

property schema_name: str | None#

Name of the message schema (e.g., ‘sensor_msgs/Image’).

Return type:

Optional[str]

set_default_representation(association, storage_format, version)#

Set the default representation for this topic.

Designates a specific representation as the default for this topic, which will be used when accessing topic data without specifying a particular representation.

Parameters:
Returns:

RepresentationRecord representing the newly set default representation.

Raises:
Return type:

roboto.domain.topics.record.RepresentationRecord

Examples

>>> from roboto.association import Association
>>> from roboto.domain.topics import RepresentationStorageFormat
>>> topic = Topic.from_id("topic_xyz789")
>>> default_repr = topic.set_default_representation(
...     association=Association.file("file_repr_456"),
...     storage_format=RepresentationStorageFormat.MCAP,
...     version=2
... )
>>> print(topic.default_representation.representation_id)
repr_789
property start_time: int | None#

Start time of the topic data in nanoseconds since UNIX epoch.

Return type:

Optional[int]

to_association()#

Convert this topic to an Association object.

Creates an Association object that can be used to reference this topic in other parts of the Roboto platform.

Returns:

Association object representing this topic.

Return type:

roboto.association.Association

Examples

>>> topic = Topic.from_id("topic_xyz789")
>>> association = topic.to_association()
>>> print(association.association_type)
AssociationType.Topic
>>> print(association.association_id)
topic_xyz789
property topic_id: str#

Unique identifier for this topic.

Return type:

str

property topic_name: str#

Name of the topic (e.g., ‘/camera/image’, ‘/imu/data’).

Return type:

str

update(end_time=NotSet, message_count=NotSet, schema_checksum=NotSet, schema_name=NotSet, start_time=NotSet, metadata_changeset=NotSet, message_path_changeset=NotSet)#

Updates a topic’s attributes and (optionally) its message paths.

Parameters:
Returns:

this Topic object with any updates applied

Raises:
  • RobotoInvalidRequestException – if any method argument has an invalid value, e.g. a negative message_count

  • RobotoConflictException – if, as part of the update, an attempt is made to add an already extant message path, and to this topic, and replace_all is not toggled on the message_path_changeset

Return type:

Topic

update_message_path(message_path, metadata_changeset=NotSet, data_type=NotSet, canonical_data_type=NotSet)#

Update the metadata and attributes of a message path.

Modifies an existing message path within this topic, allowing updates to its metadata, data type, and canonical data type. This is useful for correcting or enhancing message path definitions after initial creation.

Parameters:
Returns:

MessagePath instance representing the updated message path.

Raises:
Return type:

roboto.domain.topics.message_path.MessagePath

Examples

>>> from roboto.updates import TaglessMetadataChangeset
>>> from roboto.domain.topics import CanonicalDataType
>>> topic = Topic.from_id("topic_xyz789")
>>>
>>> # Update metadata for a message path
>>> changeset = TaglessMetadataChangeset(put_fields={"unit": "meters"})
>>> updated_path = topic.update_message_path(
...     message_path="pose.position.x",
...     metadata_changeset=changeset
... )
>>> print(updated_path.metadata["unit"])
meters
>>> # Update data type and canonical type
>>> updated_path = topic.update_message_path(
...     message_path="velocity",
...     data_type="float64",
...     canonical_data_type=CanonicalDataType.Number
... )
property url_quoted_name: str#

URL-encoded version of the topic name for use in API calls.

Return type:

str

class roboto.domain.topics.TopicDataService(roboto_client, cache_dir=None)#

Internal service for retrieving topic data.

This service handles the low-level operations for accessing topic data that has been ingested by the Roboto platform. It manages downloads, filtering, and processing various data formats to provide efficient access to time-series robotics data.

Note

This is not intended as a public API. To access topic data, prefer the get_data or get_data_as_df methods on Topic, MessagePath, or Event.

Parameters:
DEFAULT_CACHE_DIR: ClassVar[pathlib.Path]#
LOG_TIME_ATTR_NAME: ClassVar[str] = 'log_time'#
get_data(topic_id, message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, log_time_unit=TimeUnit.Nanoseconds, cache_dir_override=None)#

Retrieve data for a specific topic with optional filtering.

Downloads and processes topic data representations, applying message path and temporal filters as specified. Merges data from multiple representations when necessary.

Parameters:
  • topic_id (str) – Unique identifier of the topic to retrieve data for.

  • message_paths_include (Optional[collections.abc.Iterable[str]]) – Dot notation paths to include in the results. If None, all paths are included.

  • message_paths_exclude (Optional[collections.abc.Iterable[str]]) – Dot notation paths to exclude from the results. If None, no paths are excluded.

  • start_time (Optional[roboto.time.Time]) – Start time (inclusive) for temporal filtering.

  • end_time (Optional[roboto.time.Time]) – End time (exclusive) for temporal filtering.

  • log_time_unit (roboto.time.TimeUnit) – Time unit for the log_time field in the returned records.

  • cache_dir_override (Union[str, pathlib.Path, None]) – Override the default cache directory for downloads.

Yields:

Dictionary records containing the filtered topic data, with a ‘log_time’ field indicating the timestamp of each record.

Return type:

collections.abc.Generator[dict[str, Any], None, None]

get_data_as_df(topic_id, message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, log_time_unit=TimeUnit.Nanoseconds, cache_dir_override=None)#

Retrieve data for a specific topic as a pandas DataFrame with optional filtering.

Downloads and processes topic data representations, applying message path and temporal filters as specified. Merges data from multiple representations when necessary and returns the result as a pandas DataFrame.

Parameters:
  • topic_id (str) – Unique identifier of the topic to retrieve data for.

  • message_paths_include (Optional[collections.abc.Iterable[str]]) – Dot notation paths to include in the results. If None, all paths are included.

  • message_paths_exclude (Optional[collections.abc.Iterable[str]]) – Dot notation paths to exclude from the results. If None, no paths are excluded.

  • start_time (Optional[roboto.time.Time]) – Start time (inclusive) for temporal filtering.

  • end_time (Optional[roboto.time.Time]) – End time (exclusive) for temporal filtering.

  • log_time_unit (roboto.time.TimeUnit) – Time unit for the log_time field in the returned DataFrame.

  • cache_dir_override (Union[str, pathlib.Path, None]) – Override the default cache directory for downloads.

Returns:

pandas.DataFrame containing the filtered topic data, with log_time as the index if available.

Return type:

pandas.DataFrame

class roboto.domain.topics.TopicRecord(/, **data)#

Bases: pydantic.BaseModel

Record representing a topic in the Roboto platform.

A topic is a collection of timestamped data records that share a common name and association (typically a file). Topics represent logical data streams from robotics systems, such as sensor readings, robot state information, or other time-series data.

Data from the same file with the same topic name are considered part of the same topic. Data from different files or with different topic names belong to separate topics, even if they have similar schemas.

When source files are chunked by time or size but represent the same logical data collection, they will produce multiple topic records for the same “logical topic” (same name and schema) across those chunks.

Parameters:

data (Any)

association: roboto.association.Association#

Identifier and entity type with which this Topic is associated. E.g., a file, a dataset.

created: datetime.datetime#
created_by: str#
default_representation: RepresentationRecord | None = None#

Default Representation for this Topic. Assume that if a MessagePath is not more specifically associated with a Representation, it should use this one.

end_time: int | None = None#

Timestamp of oldest message in topic, in nanoseconds since epoch (assumed Unix epoch).

message_count: int | None = None#
message_paths: collections.abc.MutableSequence[MessagePathRecord] = None#

Zero to many MessagePathRecords associated with this TopicSource.

metadata: collections.abc.Mapping[str, Any] = None#

Arbitrary metadata.

modified: datetime.datetime#
modified_by: str#
org_id: str#
schema_checksum: str | None = None#

Checksum of topic schema. May be None if topic does not have a known/named schema.

schema_name: str | None = None#

Type of messages in topic. E.g., “sensor_msgs/PointCloud2”. May be None if topic does not have a known/named schema.

start_time: int | None = None#

Timestamp of earliest message in topic, in nanoseconds since epoch (assumed Unix epoch).

topic_id: str#
topic_name: str#
class roboto.domain.topics.UpdateMessagePathRequest(/, **data)#

Bases: pydantic.BaseModel

Request to update an existing message path within a topic.

Allows modification of message path attributes including metadata, data type, and canonical data type. Used to correct or enhance message path definitions after initial creation.

Parameters:

data (Any)

canonical_data_type: roboto.domain.topics.record.CanonicalDataType | roboto.sentinels.NotSetType#

Canonical Roboto data type for the data under this message path (optional).

Note: updating this attribute should be done with care, as it affects Roboto’s ability to interpret and visualize the data.

data_type: str | roboto.sentinels.NotSetType#

Native data type for the data under this message path (optional).

has_updates()#

Check whether this request would result in any message path modifications.

Returns:

True if the request contains changes that would modify the message path.

Return type:

bool

message_path: str#

Message path name (required).

metadata_changeset: roboto.updates.TaglessMetadataChangeset | roboto.sentinels.NotSetType#

A set of changes to the message path’s metadata (optional).

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class roboto.domain.topics.UpdateTopicRequest(/, **data)#

Bases: pydantic.BaseModel

Request to update an existing topic’s properties.

Allows modification of topic attributes including temporal boundaries, message count, schema information, metadata, and message paths.

Parameters:

data (Any)

end_time: int | None | roboto.sentinels.NotSetType#
message_count: int | roboto.sentinels.NotSetType#
message_path_changeset: MessagePathChangeset | roboto.sentinels.NotSetType#
metadata_changeset: roboto.updates.MetadataChangeset | roboto.sentinels.NotSetType#
model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

schema_checksum: str | None | roboto.sentinels.NotSetType#
schema_name: str | None | roboto.sentinels.NotSetType#
start_time: int | None | roboto.sentinels.NotSetType#