roboto.domain.topics.topic_data_service#
Module Contents#
- class roboto.domain.topics.topic_data_service.TopicDataService(roboto_client, cache_dir=None)#
Internal service for retrieving topic data.
This service handles the low-level operations for accessing topic data that has been ingested by the Roboto platform. It manages downloads, filtering, and processing various data formats to provide efficient access to time-series robotics data.
Note
This is not intended as a public API. To access topic data, prefer the
get_data
orget_data_as_df
methods onTopic
,MessagePath
, orEvent
.- Parameters:
roboto_client (roboto.http.RobotoClient)
cache_dir (Union[str, pathlib.Path, None])
- DEFAULT_CACHE_DIR: ClassVar[pathlib.Path]#
- LOG_TIME_ATTR_NAME: ClassVar[str] = 'log_time'#
- get_data(topic_id, message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, log_time_unit=TimeUnit.Nanoseconds, cache_dir_override=None)#
Retrieve data for a specific topic with optional filtering.
Downloads and processes topic data representations, applying message path and temporal filters as specified. Merges data from multiple representations when necessary.
- Parameters:
topic_id (str) – Unique identifier of the topic to retrieve data for.
message_paths_include (Optional[collections.abc.Iterable[str]]) – Dot notation paths to include in the results. If None, all paths are included.
message_paths_exclude (Optional[collections.abc.Iterable[str]]) – Dot notation paths to exclude from the results. If None, no paths are excluded.
start_time (Optional[roboto.time.Time]) – Start time (inclusive) for temporal filtering.
end_time (Optional[roboto.time.Time]) – End time (exclusive) for temporal filtering.
log_time_unit (roboto.time.TimeUnit) – Time unit for the log_time field in the returned records.
cache_dir_override (Union[str, pathlib.Path, None]) – Override the default cache directory for downloads.
- Yields:
Dictionary records containing the filtered topic data, with a ‘log_time’ field indicating the timestamp of each record.
- Return type:
collections.abc.Generator[dict[str, Any], None, None]
- get_data_as_df(topic_id, message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, log_time_unit=TimeUnit.Nanoseconds, cache_dir_override=None)#
Retrieve data for a specific topic as a pandas DataFrame with optional filtering.
Downloads and processes topic data representations, applying message path and temporal filters as specified. Merges data from multiple representations when necessary and returns the result as a pandas DataFrame.
- Parameters:
topic_id (str) – Unique identifier of the topic to retrieve data for.
message_paths_include (Optional[collections.abc.Iterable[str]]) – Dot notation paths to include in the results. If None, all paths are included.
message_paths_exclude (Optional[collections.abc.Iterable[str]]) – Dot notation paths to exclude from the results. If None, no paths are excluded.
start_time (Optional[roboto.time.Time]) – Start time (inclusive) for temporal filtering.
end_time (Optional[roboto.time.Time]) – End time (exclusive) for temporal filtering.
log_time_unit (roboto.time.TimeUnit) – Time unit for the log_time field in the returned DataFrame.
cache_dir_override (Union[str, pathlib.Path, None]) – Override the default cache directory for downloads.
- Returns:
pandas.DataFrame containing the filtered topic data, with log_time as the index if available.
- Return type:
pandas.DataFrame
- roboto.domain.topics.topic_data_service.iterable_to_set(arg)#
- Parameters:
arg (collections.abc.Iterable[str])
- Return type:
set[str]
- roboto.domain.topics.topic_data_service.logger#