roboto.domain.topics.topic_data_service#

Module Contents#

class roboto.domain.topics.topic_data_service.TopicDataService(roboto_client, cache_dir=None)#

Internal service for retrieving and managing topic data.

This service handles the low-level operations for accessing topic data that has been ingested by the Roboto platform. It manages representation downloads, filtering, and processing of various data formats to provide efficient access to time-series robotics data.

Note

This is an internal service class and is not intended as a public API. To access topic data, prefer get_data() or get_data() instead.

Parameters:
DEFAULT_CACHE_DIR: ClassVar[pathlib.Path]#
LOG_TIME_ATTR_NAME: ClassVar[str] = 'log_time'#
get_data(topic_id, message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, cache_dir_override=None)#

Retrieve data for a specific topic with optional filtering.

Downloads and processes topic data representations, applying message path and temporal filters as specified. Merges data from multiple representations when necessary.

Parameters:
  • topic_id (str) – Unique identifier of the topic to retrieve data for.

  • message_paths_include (Optional[collections.abc.Sequence[str]]) – Dot notation paths to include in the results. If None, all paths are included.

  • message_paths_exclude (Optional[collections.abc.Sequence[str]]) – Dot notation paths to exclude from the results. If None, no paths are excluded.

  • start_time (Optional[roboto.time.Time]) – Start time (inclusive) for temporal filtering.

  • end_time (Optional[roboto.time.Time]) – End time (exclusive) for temporal filtering.

  • cache_dir_override (Union[str, pathlib.Path, None]) – Override the default cache directory for downloads.

Yields:

Dictionary records containing the filtered topic data, with a ‘log_time’ field indicating the timestamp of each record.

Return type:

collections.abc.Generator[dict[str, Any], None, None]

get_data_as_df(topic_id, message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, cache_dir_override=None)#
Parameters:
  • topic_id (str)

  • message_paths_include (Optional[collections.abc.Sequence[str]])

  • message_paths_exclude (Optional[collections.abc.Sequence[str]])

  • start_time (Optional[roboto.time.Time])

  • end_time (Optional[roboto.time.Time])

  • cache_dir_override (Union[str, pathlib.Path, None])

Return type:

pandas.DataFrame

roboto.domain.topics.topic_data_service.logger#