roboto.domain.topics.topic_data_service#
Module Contents#
- class roboto.domain.topics.topic_data_service.TopicDataService(roboto_client, cache_dir=None)#
Internal service for retrieving topic data.
This service handles the low-level operations for accessing topic data that has been ingested by the Roboto platform. It manages downloads, filtering, and processing various data formats to provide efficient access to time-series robotics data.
Note
This is not intended as a public API. To access topic data, prefer the
get_dataorget_data_as_dfmethods onTopic,MessagePath, orEvent.- Parameters:
roboto_client (roboto.http.RobotoClient)
cache_dir (Union[str, pathlib.Path, None])
- DEFAULT_CACHE_DIR: ClassVar[pathlib.Path]#
- get_data(topic_id, message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, cache_dir_override=None)#
Retrieve data for a specific topic with optional filtering.
- Parameters:
topic_id (str) – Unique identifier of the topic to retrieve data for.
message_paths_include (Optional[collections.abc.Iterable[str]]) – Dot notation paths to include in the results. If None, all paths are included.
message_paths_exclude (Optional[collections.abc.Iterable[str]]) – Dot notation paths to exclude from the results. If None, no paths are excluded.
start_time (Optional[roboto.time.Time]) – Start time (inclusive) for temporal filtering.
end_time (Optional[roboto.time.Time]) – End time (exclusive) for temporal filtering.
cache_dir_override (Union[str, pathlib.Path, None]) – Override the default cache directory for downloads.
- Yields:
Tuple of (timestamp, record) where timestamp is in nanoseconds since Unix epoch.
- Return type:
collections.abc.Generator[tuple[roboto.domain.topics.topic_reader.Timestamp, dict[str, Any]], None, None]
- get_data_as_df(topic_id, message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, cache_dir_override=None)#
Retrieve data for a specific topic as a pandas DataFrame with optional filtering.
- Parameters:
topic_id (str) – Unique identifier of the topic to retrieve data for.
message_paths_include (Optional[collections.abc.Iterable[str]]) – Dot notation paths to include in the results. If None, all paths are included.
message_paths_exclude (Optional[collections.abc.Iterable[str]]) – Dot notation paths to exclude from the results. If None, no paths are excluded.
start_time (Optional[roboto.time.Time]) – Start time (inclusive) for temporal filtering.
end_time (Optional[roboto.time.Time]) – End time (exclusive) for temporal filtering.
cache_dir_override (Union[str, pathlib.Path, None]) – Override the default cache directory for downloads.
- Returns:
pandas.DataFrame
The index of the DataFrame (
df.index) is a pandas.DateTimeIndex, labeling the timestamp of each row.- Return type:
pandas.DataFrame
- roboto.domain.topics.topic_data_service.iterable_to_set(arg)#
- Parameters:
arg (collections.abc.Iterable[str])
- Return type:
set[str]
- roboto.domain.topics.topic_data_service.logger#