roboto.domain.topics.mcap_topic_reader#
Module Contents#
- class roboto.domain.topics.mcap_topic_reader.McapTopicReader(roboto_client, cache_dir)#
Bases:
roboto.domain.topics.topic_reader.TopicReader
Helper class that provides a standard way to create an ABC using inheritance.
- Parameters:
roboto_client (roboto.http.RobotoClient)
cache_dir (pathlib.Path)
- static accepts(message_paths_to_representations)#
- Parameters:
message_paths_to_representations (collections.abc.Iterable[roboto.domain.topics.operations.MessagePathRepresentationMapping])
- Return type:
bool
- get_data(message_paths_to_representations, log_time_attr_name, start_time=None, end_time=None)#
- Parameters:
message_paths_to_representations (collections.abc.Iterable[roboto.domain.topics.operations.MessagePathRepresentationMapping])
log_time_attr_name (str)
start_time (Optional[int])
end_time (Optional[int])
- Return type:
collections.abc.Generator[dict[str, Any], None, None]
- get_data_as_df(message_paths_to_representations, log_time_attr_name, start_time=None, end_time=None)#
- Parameters:
message_paths_to_representations (collections.abc.Iterable[roboto.domain.topics.operations.MessagePathRepresentationMapping])
log_time_attr_name (str)
start_time (int | None)
end_time (int | None)
- Return type:
pandas.DataFrame
- roboto.domain.topics.mcap_topic_reader.OUTFILE_NAME_PATTERN = '{repr_id}_{file_id}.mcap'#
- roboto.domain.topics.mcap_topic_reader.garbage_collect_old_topic_data(cache_dir, expire_after=datetime.timedelta(days=7))#
- Parameters:
cache_dir (pathlib.Path)
expire_after (datetime.timedelta)
- roboto.domain.topics.mcap_topic_reader.logger#