roboto.domain.topics.mcap_topic_reader#
Module Contents#
- class roboto.domain.topics.mcap_topic_reader.McapTopicReader(roboto_client, cache_dir)#
Bases:
roboto.domain.topics.topic_reader.TopicReader
Private interface for retrieving topic data stored in MCAP files.
Note
This is not intended as a public API. To access topic data, prefer the
get_data
orget_data_as_df
methods onTopic
,MessagePath
, orEvent
.- Parameters:
roboto_client (roboto.http.RobotoClient)
cache_dir (pathlib.Path)
- static accepts(message_paths_to_representations)#
- Parameters:
message_paths_to_representations (collections.abc.Iterable[roboto.domain.topics.operations.MessagePathRepresentationMapping])
- Return type:
bool
- get_data(message_paths_to_representations, log_time_attr_name, log_time_unit=TimeUnit.Nanoseconds, start_time=None, end_time=None, timestamp_message_path_representation_mapping=None)#
- Parameters:
message_paths_to_representations (collections.abc.Iterable[roboto.domain.topics.operations.MessagePathRepresentationMapping])
log_time_attr_name (str)
log_time_unit (roboto.time.TimeUnit)
start_time (Optional[int])
end_time (Optional[int])
timestamp_message_path_representation_mapping (Optional[roboto.domain.topics.operations.MessagePathRepresentationMapping])
- Return type:
collections.abc.Generator[dict[str, Any], None, None]
- get_data_as_df(message_paths_to_representations, log_time_attr_name, log_time_unit=TimeUnit.Nanoseconds, start_time=None, end_time=None, timestamp_message_path_representation_mapping=None)#
- Parameters:
message_paths_to_representations (collections.abc.Iterable[roboto.domain.topics.operations.MessagePathRepresentationMapping])
log_time_attr_name (str)
log_time_unit (roboto.time.TimeUnit)
start_time (Optional[int])
end_time (Optional[int])
timestamp_message_path_representation_mapping (Optional[roboto.domain.topics.operations.MessagePathRepresentationMapping])
- Return type:
pandas.DataFrame
- roboto.domain.topics.mcap_topic_reader.OUTFILE_NAME_PATTERN = '{repr_id}_{file_id}.mcap'#
- roboto.domain.topics.mcap_topic_reader.garbage_collect_old_topic_data(cache_dir, expire_after=datetime.timedelta(days=7))#
- Parameters:
cache_dir (pathlib.Path)
expire_after (datetime.timedelta)
- roboto.domain.topics.mcap_topic_reader.logger#