roboto.domain.topics.topic_data_service#

Module Contents#

class roboto.domain.topics.topic_data_service.TopicDataService(roboto_client, cache_dir=None)#

Internal service for retrieving topic data.

This service handles the low-level operations for accessing topic data that has been ingested by the Roboto platform. It manages downloads, filtering, and processing various data formats to provide efficient access to time-series robotics data.

Note

This is not intended as a public API. To access topic data, prefer the get_data or get_data_as_df methods on Topic, MessagePath, or Event.

Parameters:
DEFAULT_CACHE_DIR: ClassVar[pathlib.Path]#
get_data(topic_id, message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, cache_dir_override=None)#

Retrieve data for a specific topic with optional filtering.

Parameters:
  • topic_id (str) – Unique identifier of the topic to retrieve data for.

  • message_paths_include (Optional[collections.abc.Iterable[str]]) – Dot notation paths to include in the results. If None, all paths are included.

  • message_paths_exclude (Optional[collections.abc.Iterable[str]]) – Dot notation paths to exclude from the results. If None, no paths are excluded.

  • start_time (Optional[roboto.time.Time]) – Start time (inclusive) for temporal filtering.

  • end_time (Optional[roboto.time.Time]) – End time (exclusive) for temporal filtering.

  • cache_dir_override (Union[str, pathlib.Path, None]) – Override the default cache directory for downloads.

Yields:

Tuple of (timestamp, record) where timestamp is in nanoseconds since Unix epoch.

Return type:

collections.abc.Generator[tuple[roboto.domain.topics.topic_reader.Timestamp, dict[str, Any]], None, None]

get_data_as_df(topic_id, message_paths_include=None, message_paths_exclude=None, start_time=None, end_time=None, cache_dir_override=None)#

Retrieve data for a specific topic as a pandas DataFrame with optional filtering.

Parameters:
  • topic_id (str) – Unique identifier of the topic to retrieve data for.

  • message_paths_include (Optional[collections.abc.Iterable[str]]) – Dot notation paths to include in the results. If None, all paths are included.

  • message_paths_exclude (Optional[collections.abc.Iterable[str]]) – Dot notation paths to exclude from the results. If None, no paths are excluded.

  • start_time (Optional[roboto.time.Time]) – Start time (inclusive) for temporal filtering.

  • end_time (Optional[roboto.time.Time]) – End time (exclusive) for temporal filtering.

  • cache_dir_override (Union[str, pathlib.Path, None]) – Override the default cache directory for downloads.

Returns:

pandas.DataFrame

The index of the DataFrame (df.index) is a pandas.DateTimeIndex, labeling the timestamp of each row.

Return type:

pandas.DataFrame

roboto.domain.topics.topic_data_service.iterable_to_set(arg)#
Parameters:

arg (collections.abc.Iterable[str])

Return type:

set[str]

roboto.domain.topics.topic_data_service.logger#