roboto.domain.actions#
Actions domain module for the Roboto SDK.
This module provides the core domain entities and operations for working with Actions, Invocations, and Triggers in the Roboto platform. Actions are reusable functions that process, transform, or analyze data. Invocations represent executions of actions, and Triggers automatically invoke actions when specific events or conditions occur.
The main classes in this module are:
Action
: A reusable function to process dataInvocation
: An execution instance of an actionTrigger
: A rule that automatically invokes actions
Examples
Basic action invocation:
>>> from roboto.domain.actions import Action, InvocationSource
>>> action = Action.from_name("my_action", owner_org_id="my-org")
>>> invocation = action.invoke(
... invocation_source=InvocationSource.Manual,
... parameter_values={"param1": "value1"}
... )
>>> invocation.wait_for_terminal_status()
Creating a trigger:
>>> from roboto.domain.actions import Trigger, TriggerForEachPrimitive
>>> trigger = Trigger.create(
... name="auto_process",
... action_name="my_action",
... required_inputs=["**/*.bag"],
... for_each=TriggerForEachPrimitive.Dataset
... )
Submodules#
- roboto.domain.actions.action
- roboto.domain.actions.action_operations
- roboto.domain.actions.action_record
- roboto.domain.actions.invocation
- roboto.domain.actions.invocation_operations
- roboto.domain.actions.invocation_record
- roboto.domain.actions.trigger
- roboto.domain.actions.trigger_operations
- roboto.domain.actions.trigger_record
Package Contents#
- class roboto.domain.actions.Accessibility#
Bases:
str
,enum.Enum
Controls who can query for and invoke an action.
Accessibility levels determine the visibility and usability of actions within the Roboto platform. Actions can be private to an organization or published publicly in the Action Hub.
Future accessibility levels may include: “user” and/or “team”.
- ActionHub = 'action_hub'#
All users of Roboto can query for and invoke the action.
- Organization = 'organization'#
All members of the organization owning the Action can query for and invoke the action.
- class roboto.domain.actions.Action(record, roboto_client=None)#
A reusable function to process, transform or analyze data in Roboto.
Actions are containerized functions that can be invoked to process datasets, files, or other data sources within the Roboto platform. They encapsulate processing logic, dependencies, and compute requirements, making data processing workflows reproducible and scalable.
Actions can be created, updated, invoked, and managed through this class. They support parameterization, inheritance from other actions, and can be triggered automatically based on events or conditions.
An Action consists of:
Container image and execution parameters
Input/output specifications
Compute requirements (CPU, memory, etc.)
Parameters that can be customized at invocation time
Metadata and tags for organization
Actions are owned by organizations and can have different accessibility levels (private to organization or public in the Action Hub).
- Parameters:
roboto_client (Optional[roboto.http.RobotoClient])
- property accessibility: roboto.domain.actions.action_record.Accessibility#
The accessibility level of this action (Organization or ActionHub).
- Return type:
- property compute_requirements: roboto.domain.actions.action_record.ComputeRequirements | None#
The compute requirements (CPU, memory) for running this action.
- Return type:
Optional[roboto.domain.actions.action_record.ComputeRequirements]
- property container_parameters: roboto.domain.actions.action_record.ContainerParameters | None#
The container parameters including image URI and execution settings.
- Return type:
Optional[roboto.domain.actions.action_record.ContainerParameters]
- classmethod create(name, compute_requirements=None, container_parameters=None, description=None, inherits=None, metadata=None, parameters=None, requires_downloaded_inputs=None, short_description=None, tags=None, timeout=None, uri=None, caller_org_id=None, roboto_client=None)#
Create a new action in the Roboto platform.
Creates a new action with the specified configuration. The action will be owned by the caller’s organization and can be invoked to process data.
- Parameters:
name (str) – Unique name for the action within the organization.
compute_requirements (Optional[roboto.domain.actions.action_record.ComputeRequirements]) – CPU, memory, and other compute specifications.
container_parameters (Optional[roboto.domain.actions.action_record.ContainerParameters]) – Container image URI, entrypoint, and environment variables.
description (Optional[str]) – Detailed description of what the action does.
inherits (Optional[roboto.domain.actions.action_record.ActionReference]) – Reference to another action to inherit configuration from.
metadata (Optional[dict[str, Any]]) – Custom key-value metadata to associate with the action.
parameters (Optional[list[roboto.domain.actions.action_record.ActionParameter]]) – List of parameters that can be provided at invocation time.
requires_downloaded_inputs (Optional[bool]) – Whether input files should be downloaded before execution.
short_description (Optional[str]) – Brief description (max 140 characters) for display purposes.
tags (Optional[list[str]]) – List of tags for categorizing and searching actions.
timeout (Optional[int]) – Maximum execution time in minutes before the action is terminated.
uri (Optional[str]) – Container image URI if not inheriting from another action.
caller_org_id (Optional[str]) – Organization ID to create the action in. Defaults to caller’s org.
roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses default if not provided.
- Returns:
The newly created Action instance.
- Raises:
RobotoIllegalArgumentException – If short_description exceeds 140 characters or other validation errors occur.
RobotoInvalidRequestException – If the request is malformed.
RobotoUnauthorizedException – If the caller lacks permission to create actions.
- Return type:
Examples
Create a simple action:
>>> action = Action.create( ... name="hello_world", ... uri="ubuntu:latest", ... description="A simple hello world action" ... )
Create an action with parameters and compute requirements:
>>> from roboto.domain.actions import ComputeRequirements, ActionParameter >>> action = Action.create( ... name="data_processor", ... uri="my-registry.com/processor:v1.0", ... description="Processes sensor data with configurable parameters", ... compute_requirements=ComputeRequirements(vCPU=4096, memory=8192), ... parameters=[ ... ActionParameter(name="threshold", required=True, description="Processing threshold"), ... ActionParameter(name="output_format", default="json", description="Output format") ... ], ... tags=["data-processing", "sensors"], ... timeout=60 ... )
Create an action that inherits from another:
>>> base_action = Action.from_name("base_processor", owner_org_id="roboto-public") >>> derived_action = Action.create( ... name="custom_processor", ... inherits=base_action.record.reference, ... description="Custom processor based on base_processor", ... metadata={"version": "2.0", "team": "data-science"} ... )
- property created: datetime.datetime#
The timestamp when this action was created.
- Return type:
datetime.datetime
- property created_by: str#
The user ID who created this action.
- Return type:
str
- delete()#
Delete this action from the Roboto platform.
Permanently removes this action and all its versions. This operation cannot be undone.
- Raises:
RobotoNotFoundException – If the action is not found.
RobotoUnauthorizedException – If the caller lacks permission to delete the action.
- Return type:
None
Examples
Delete an action:
>>> action = Action.from_name("old_action") >>> action.delete()
- property description: str | None#
The detailed description of what this action does.
- Return type:
Optional[str]
- property digest: str#
The unique digest identifying this specific version of the action.
- Return type:
str
- classmethod from_name(name, digest=None, owner_org_id=None, roboto_client=None)#
Load an existing action by name.
Retrieves an action from the Roboto platform by its name and optionally a specific version digest. Action names are unique within an organization, so a name + org_id combination always provides a fully qualified reference to a specific action.
- Parameters:
name (str) – Name of the action to retrieve. Must be unique within the organization.
digest (Optional[str]) – Specific version digest of the action. If not provided, returns the latest version.
owner_org_id (Optional[str]) – Organization ID that owns the action. If not provided, searches in the caller’s organization.
roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses default if not provided.
- Returns:
The Action instance.
- Raises:
RobotoNotFoundException – If the action is not found.
RobotoUnauthorizedException – If the caller lacks permission to access the action.
- Return type:
Examples
Load the latest version of an action:
>>> action = Action.from_name("data_processor")
Load a specific version of an action:
>>> action = Action.from_name( ... "data_processor", ... digest="abc123def456" ... )
Load an action from another organization:
>>> action = Action.from_name( ... "public_processor", ... owner_org_id="roboto-public" ... )
- property inherits_from: roboto.domain.actions.action_record.ActionReference | None#
Reference to another action this action inherits configuration from.
- Return type:
Optional[roboto.domain.actions.action_record.ActionReference]
- invoke(invocation_source, data_source_id=None, data_source_type=None, input_data=None, upload_destination=None, compute_requirement_overrides=None, container_parameter_overrides=None, idempotency_id=None, invocation_source_id=None, parameter_values=None, timeout=None, caller_org_id=None)#
Invokes this action using any inputs and options provided.
Executes this action with the specified parameters and returns an Invocation object that can be used to track progress and retrieve results.
- Parameters:
invocation_source (roboto.domain.actions.invocation_record.InvocationSource) – Manual, trigger, etc.
data_source_type (Optional[roboto.domain.actions.invocation_record.InvocationDataSourceType]) – If set, should equal Dataset for backward compatibility.
data_source_id (Optional[str]) – If set, should be a dataset ID for backward compatibility.
input_data (Optional[Union[list[str], roboto.domain.actions.invocation_record.InvocationInput]]) – Either a list of file name patterns, or an InvocationInput specification.
upload_destination (Optional[roboto.domain.actions.invocation_record.InvocationUploadDestination]) – Default upload destination (e.g. dataset) for files written to the invocation’s output directory.
compute_requirement_overrides (Optional[roboto.domain.actions.action_record.ComputeRequirements]) – Overrides for the action’s default compute requirements (e.g. vCPU)
container_parameter_overrides (Optional[roboto.domain.actions.action_record.ContainerParameters]) – Overrides for the action’s default container parameters (e.g. entrypoint)
idempotency_id (Optional[str]) – Unique ID to ensure an invocation is run exactly once.
invocation_source_id (Optional[str]) – ID of the trigger or manual operator performing the invocation.
parameter_values (Optional[dict[str, Any]]) – Action parameter values.
timeout (Optional[int]) – Action timeout in minutes.
caller_org_id (Optional[str]) – Org ID of the caller.
- Returns:
An Invocation object that can be used to track the invocation’s progress.
- Raises:
RobotoIllegalArgumentException – Invalid method parameters or combinations.
RobotoInvalidRequestException – Incorrectly formed request.
RobotoUnauthorizedException – The caller is not authorized to invoke this action.
- Return type:
Examples
Basic invocation with a dataset:
>>> from roboto import Action, InvocationSource >>> action = Action.from_name("ros_ingestion", owner_org_id="roboto-public") >>> iv = action.invoke( ... invocation_source=InvocationSource.Manual, ... data_source_id="ds_12345", ... data_source_type=InvocationDataSourceType.Dataset, ... input_data=["**/*.bag"], ... upload_destination=InvocationUploadDestination.dataset("ds_12345") ... ) >>> iv.wait_for_terminal_status()
Invocation with compute requirement overrides:
>>> from roboto import Action, InvocationSource, ComputeRequirements >>> action = Action.from_name("image_processing", owner_org_id="roboto-public") >>> compute_reqs = ComputeRequirements(vCPU=4096, memory=8192) >>> iv = action.invoke( ... invocation_source=InvocationSource.Manual, ... compute_requirement_overrides=compute_reqs, ... parameter_values={"threshold": 0.75} ... ) >>> status = iv.wait_for_terminal_status() >>> print(status) 'COMPLETED'
- property metadata: dict[str, Any]#
Custom metadata key-value pairs associated with this action.
- Return type:
dict[str, Any]
- property modified: datetime.datetime#
The timestamp when this action was last modified.
- Return type:
datetime.datetime
- property modified_by: str#
The user ID who last modified this action.
- Return type:
str
- property name: str#
The unique name of this action within its organization.
- Return type:
str
- property org_id: str#
The organization ID that owns this action.
- Return type:
str
- property parameters: collections.abc.Sequence[roboto.domain.actions.action_record.ActionParameter]#
The list of parameters that can be provided when invoking this action.
- Return type:
collections.abc.Sequence[roboto.domain.actions.action_record.ActionParameter]
- property published: datetime.datetime | None#
The timestamp when this action was published to the Action Hub, if applicable.
- Return type:
Optional[datetime.datetime]
- classmethod query(spec=None, accessibility=Accessibility.Organization, owner_org_id=None, roboto_client=None)#
Query actions with optional filtering and pagination.
Searches for actions based on the provided query specification. Can search within an organization or across the public Action Hub.
- Parameters:
spec (Optional[roboto.query.QuerySpecification]) – Query specification with filters, sorting, and pagination. If not provided, returns all accessible actions.
accessibility (roboto.domain.actions.action_record.Accessibility) – Whether to search organization actions or public Action Hub. Defaults to Organization.
owner_org_id (Optional[str]) – Organization ID to search within. If not provided, searches in the caller’s organization.
roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses default if not provided.
- Yields:
Action instances matching the query criteria.
- Raises:
ValueError – If the query specification contains unknown fields.
RobotoUnauthorizedException – If the caller lacks permission to query actions.
- Return type:
collections.abc.Generator[Action, None, None]
Examples
Query all actions in your organization:
>>> for action in Action.query(): ... print(f"Action: {action.name}")
Query actions with specific tags:
>>> from roboto.query import QuerySpecification >>> spec = QuerySpecification().where("tags").contains("ml") >>> for action in Action.query(spec): ... print(f"ML Action: {action.name}")
Query public actions in the Action Hub:
>>> from roboto.domain.actions import Accessibility >>> spec = QuerySpecification().where("name").contains("ros") >>> for action in Action.query(spec, accessibility=Accessibility.ActionHub): ... print(f"Public ROS Action: {action.name}")
Query with pagination:
>>> spec = QuerySpecification().limit(10).order_by("created", ascending=False) >>> recent_actions = list(Action.query(spec)) >>> print(f"Found {len(recent_actions)} recent actions")
- property record: roboto.domain.actions.action_record.ActionRecord#
The underlying action record containing all action data.
- Return type:
- property requires_downloaded_inputs: bool#
Whether input files should be downloaded before executing this action.
- Return type:
bool
- set_accessibility(accessibility)#
Set the accessibility level of this action.
Changes whether this action is private to the organization or published to the public Action Hub.
- Parameters:
accessibility (roboto.domain.actions.action_record.Accessibility) – The new accessibility level (Organization or ActionHub).
- Returns:
This Action instance with updated accessibility.
- Raises:
RobotoUnauthorizedException – If the caller lacks permission to modify the action.
- Return type:
Examples
Make an action public in the Action Hub:
>>> from roboto.domain.actions import Accessibility >>> action = Action.from_name("my_action") >>> action.set_accessibility(Accessibility.ActionHub)
Make an action private to the organization:
>>> action.set_accessibility(Accessibility.Organization)
- property short_description: str | None#
A brief description of the action (max 140 characters) for display purposes.
- Return type:
Optional[str]
- property tags: list[str]#
The list of tags associated with this action for categorization.
- Return type:
list[str]
- property timeout: int | None#
The maximum execution time in minutes before the action is terminated.
- Return type:
Optional[int]
- to_dict()#
Convert this action to a dictionary representation.
- Returns:
Dictionary containing all action data in JSON-serializable format.
- Return type:
dict[str, Any]
Examples
Get action as dictionary:
>>> action = Action.from_name("my_action") >>> action_dict = action.to_dict() >>> print(action_dict["name"]) 'my_action'
- update(compute_requirements=NotSet, container_parameters=NotSet, description=NotSet, inherits=NotSet, metadata_changeset=NotSet, parameter_changeset=NotSet, short_description=NotSet, timeout=NotSet, uri=NotSet, requires_downloaded_inputs=NotSet)#
Update this action with new configuration.
Updates the action with the provided changes. Only specified parameters will be modified; others remain unchanged. This creates a new version of the action.
- Parameters:
compute_requirements (Optional[Union[roboto.domain.actions.action_record.ComputeRequirements, roboto.sentinels.NotSetType]]) – New compute requirements (CPU, memory).
container_parameters (Optional[Union[roboto.domain.actions.action_record.ContainerParameters, roboto.sentinels.NotSetType]]) – New container parameters (image, entrypoint, etc.).
description (Optional[Union[str, roboto.sentinels.NotSetType]]) – New detailed description.
inherits (Optional[Union[roboto.domain.actions.action_record.ActionReference, roboto.sentinels.NotSetType]]) – New action reference to inherit from.
metadata_changeset (Union[roboto.updates.MetadataChangeset, roboto.sentinels.NotSetType]) – Changes to apply to metadata (add, remove, update keys).
parameter_changeset (Union[roboto.domain.actions.action_record.ActionParameterChangeset, roboto.sentinels.NotSetType]) – Changes to apply to parameters (add, remove, update).
short_description (Optional[Union[str, roboto.sentinels.NotSetType]]) – New brief description (max 140 characters).
timeout (Optional[Union[int, roboto.sentinels.NotSetType]]) – New maximum execution time in minutes.
uri (Optional[Union[str, roboto.sentinels.NotSetType]]) – New container image URI.
requires_downloaded_inputs (Union[bool, roboto.sentinels.NotSetType]) – Whether to download input files before execution.
- Returns:
This Action instance with updated configuration.
- Raises:
RobotoIllegalArgumentException – If short_description exceeds 140 characters or other validation errors occur.
RobotoUnauthorizedException – If the caller lacks permission to update the action.
- Return type:
Examples
Update action description and timeout:
>>> action = Action.from_name("my_action") >>> action.update( ... description="Updated description of what this action does", ... timeout=45 ... )
Update compute requirements:
>>> from roboto.domain.actions import ComputeRequirements >>> action.update( ... compute_requirements=ComputeRequirements(vCPU=4096, memory=8192) ... )
Add metadata using changeset:
>>> from roboto.updates import MetadataChangeset >>> changeset = MetadataChangeset().set("version", "2.0").set("team", "ml") >>> action.update(metadata_changeset=changeset)
- property uri: str | None#
The container image URI for this action.
- Return type:
Optional[str]
- class roboto.domain.actions.ActionParameter(/, **data)#
Bases:
pydantic.BaseModel
A parameter that can be provided to an Action at invocation time.
Action parameters allow customization of action behavior without modifying the action itself. Parameters can be required or optional, have default values, and include descriptions for documentation.
Parameters are validated when an action is invoked, ensuring that required parameters are provided and that values conform to expected types.
- Parameters:
data (Any)
- default: Any | None = None#
Default value applied for parameter if it is not required and no value is given at invocation.
Accepts any default value, but coerced to a string.
- description: str | None = None#
Human-readable description of the parameter.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str#
Name of the parameter.
- required: bool = False#
Whether this parameter is required at invocation time.
- classmethod validate_default(v)#
- Parameters:
v (Optional[Any])
- Return type:
Optional[str]
- class roboto.domain.actions.ActionParameterChangeset(/, **data)#
Bases:
pydantic.BaseModel
A changeset used to modify Action parameters.
- Parameters:
data (Any)
- class Builder#
- build()#
- Return type:
- put_parameter(parameter)#
- Parameters:
parameter (ActionParameter)
- Return type:
- remove_parameter(parameter_name)#
- Parameters:
parameter_name (str)
- Return type:
- is_empty()#
- Return type:
bool
- put_parameters: list[ActionParameter] = None#
Parameters to add or update.
- remove_parameters: list[str] = None#
Names of parameters to remove.
- class roboto.domain.actions.ActionProvenance(/, **data)#
Bases:
pydantic.BaseModel
Provenance information for an action
- Parameters:
data (Any)
- digest: str | None = None#
- name: str#
- org_id: str#
- class roboto.domain.actions.ActionRecord(**kwargs)#
Bases:
pydantic.BaseModel
A wire-transmissible representation of an action.
- accessibility: Accessibility#
- compute_digest()#
- Return type:
str
- compute_requirements: ComputeRequirements | None = None#
- container_parameters: ContainerParameters | None = None#
- created: datetime.datetime#
- created_by: str#
- description: str | None = None#
- digest: str | None = None#
- inherits: ActionReference | None = None#
- metadata: dict[str, Any] = None#
- modified: datetime.datetime#
- modified_by: str#
- name: str#
- org_id: str#
- parameters: list[ActionParameter] = None#
- published: datetime.datetime | None = None#
- property reference: ActionReference#
- Return type:
- requires_downloaded_inputs: bool | None = None#
- serialize_metadata(metadata)#
- Parameters:
metadata (dict[str, Any])
- Return type:
roboto.types.UserMetadata
- short_description: str | None = None#
- tags: list[str] = None#
- timeout: int | None = None#
- uri: str | None = None#
- class roboto.domain.actions.ActionReference(/, **data)#
Bases:
pydantic.BaseModel
Qualified action reference.
- Parameters:
data (Any)
- digest: str | None = None#
- name: str#
- owner: str | None = None#
- class roboto.domain.actions.CancelActiveInvocationsRequest(/, **data)#
Bases:
pydantic.BaseModel
Request payload to bulk cancel all active invocations within an organization.
This operation cancels multiple invocations in a single request, but only affects invocations that are in non-terminal states (Queued, Running, etc.). The operation is limited in the number of invocations it will attempt to cancel in a single call for performance reasons.
For large numbers of active invocations, continue calling this operation until the count_remaining returned in the response is 0.
- Parameters:
data (Any)
- created_before: datetime.datetime | None = None#
Only cancel invocations created before this timestamp.
If not provided, cancels all active invocations regardless of age.
- class roboto.domain.actions.CancelActiveInvocationsResponse(/, **data)#
Bases:
pydantic.BaseModel
Response payload from bulk cancellation of active invocations.
Contains the results of a bulk cancellation operation, including counts of successful and failed cancellations, and the number of invocations remaining to be cancelled.
- Parameters:
data (Any)
- count_remaining: int#
Number of invocations that are still active.
- failure_count: int#
Number of invocations that failed to cancel.
- success_count: int#
Number of invocations successfully cancelled.
- class roboto.domain.actions.ComputeRequirements(/, **data)#
Bases:
pydantic.BaseModel
Compute requirements for an action invocation.
- Parameters:
data (Any)
- gpu: Literal[False] = False#
- memory: int = 1024#
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- storage: int = 21#
- vCPU: int = 512#
- validate_storage_limit()#
- validate_vcpu_mem_combination()#
- Return type:
- class roboto.domain.actions.ContainerParameters(/, **data)#
Bases:
pydantic.BaseModel
Container parameters for an action invocation.
- Parameters:
data (Any)
- command: list[str] | None = None#
- entry_point: list[str] | None = None#
- env_vars: dict[str, str] | None = None#
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- workdir: str | None = None#
- class roboto.domain.actions.CreateActionRequest(/, **data)#
Bases:
pydantic.BaseModel
Request payload to create a new action.
Contains all the configuration needed to create a new action in the Roboto platform, including container settings, compute requirements, parameters, and metadata.
- Parameters:
data (Any)
- compute_requirements: roboto.domain.actions.action_record.ComputeRequirements | None = None#
CPU, memory, and other compute specifications.
- container_parameters: roboto.domain.actions.action_record.ContainerParameters | None = None#
Container image URI, entrypoint, and environment variables.
- description: str | None = None#
Detailed description of what the action does.
- inherits: roboto.domain.actions.action_record.ActionReference | None = None#
Reference to another action to inherit configuration from.
- metadata: dict[str, Any] = None#
Custom key-value metadata to associate with the action.
- name: str#
Unique name for the action within the organization.
- parameters: list[roboto.domain.actions.action_record.ActionParameter] = None#
List of parameters that can be provided at invocation time.
- requires_downloaded_inputs: bool | None = None#
Whether input files should be downloaded before execution.
- short_description: str | None = None#
Brief description (max 140 characters) for display purposes.
- tags: list[str] = None#
List of tags for categorizing and searching actions.
- timeout: int | None = None#
Maximum execution time in minutes before the action is terminated.
- uri: str | None = None#
Container image URI if not inheriting from another action.
- class roboto.domain.actions.CreateInvocationRequest(/, **data)#
Bases:
pydantic.BaseModel
Request payload to create a new action invocation.
Contains all the configuration needed to invoke an action, including input data specifications, parameter values, and execution overrides.
- Parameters:
data (Any)
- compute_requirement_overrides: roboto.domain.actions.action_record.ComputeRequirements | None = None#
Optional overrides for CPU, memory, and other compute specifications.
- container_parameter_overrides: roboto.domain.actions.action_record.ContainerParameters | None = None#
Optional overrides for container image, entrypoint, and environment variables.
- data_source_id: str#
ID of the data source providing input data.
- data_source_type: roboto.domain.actions.invocation_record.InvocationDataSourceType#
Type of the data source (e.g., Dataset).
- idempotency_id: str | None = None#
Optional unique ID to ensure the invocation runs exactly once.
- input_data: list[str]#
List of file patterns for input data selection.
- invocation_source: roboto.domain.actions.invocation_record.InvocationSource#
Source of the invocation (Manual, Trigger, etc.).
- invocation_source_id: str | None = None#
Optional ID of the entity that initiated the invocation.
- parameter_values: dict[str, Any] | None = None#
Optional parameter values to pass to the action.
- rich_input_data: roboto.domain.actions.invocation_record.InvocationInput | None = None#
Optional rich input data specification that supersedes the simple input_data patterns.
- timeout: int | None = None#
Optional timeout override in minutes.
- upload_destination: roboto.domain.actions.invocation_record.InvocationUploadDestination | None = None#
Optional destination for output files.
- class roboto.domain.actions.CreateTriggerRequest(/, **data)#
Bases:
pydantic.BaseModel
Request payload to create a new trigger.
Contains all the configuration needed to create a trigger that automatically invokes actions when specific conditions are met.
- Parameters:
data (Any)
- action_digest: str | None = None#
Optional specific version digest of the action to invoke. If not provided, uses the latest version.
- action_name: str#
Name of the action to invoke when the trigger fires.
- action_owner_id: str | None = None#
Organization ID that owns the target action. If not provided, searches in the caller’s organization.
- additional_inputs: list[str] | None = None#
Optional additional file patterns to include in action invocations beyond the required inputs.
- causes: list[roboto.domain.actions.trigger_record.TriggerEvaluationCause] | None = None#
List of events that can cause this trigger to be evaluated. If not provided, uses default causes.
- compute_requirement_overrides: roboto.domain.actions.ComputeRequirements | None = None#
Optional compute requirement overrides for action invocations.
- condition: roboto.query.ConditionType | None = None#
Optional condition that must be met for the trigger to fire.
Can filter based on metadata, file properties, etc.
- container_parameter_overrides: roboto.domain.actions.ContainerParameters | None = None#
Optional container parameter overrides for action invocations.
- enabled: bool = True#
Whether the trigger should be active immediately after creation.
- for_each: roboto.domain.actions.trigger_record.TriggerForEachPrimitive#
Granularity of execution - Dataset or DatasetFile.
- name: str = None#
Unique name for the trigger (alphanumeric, hyphens, underscores only, max 256 characters).
- parameter_values: dict[str, Any] | None = None#
Parameter values to pass to the action when invoked.
- required_inputs: list[str]#
List of file patterns that must be present for the trigger to fire. Uses glob patterns like ‘**/*.bag’.
- service_user_id: str | None = None#
Optional service user ID for authentication.
- timeout: int | None = None#
Optional timeout override for action invocations in minutes.
- validate_additional_inputs(value)#
- Parameters:
value (Optional[list[str]])
- Return type:
Optional[list[str]]
- validate_required_inputs(value)#
- Parameters:
value (list[str])
- Return type:
list[str]
- class roboto.domain.actions.DataSelector(/, **data)#
Bases:
pydantic.BaseModel
Selector for inputs (e.g. files) to an action invocation.
- Parameters:
data (Any)
- dataset_id: str | None = None#
Dataset ID, needed for backward compatibility purposes. Prefer RoboQL: dataset_id = <the ID in double quotes>
- ensure_not_empty()#
- Return type:
- ids: list[str] | None = None#
Specific input IDs (e.g. a dataset ID).
- names: list[str] | None = None#
Specific input names (e.g. topic names).
- query: str | None = None#
RoboQL query representing the desired inputs.
- class roboto.domain.actions.EvaluateTriggersRequest(/, **data)#
Bases:
pydantic.BaseModel
Request payload to manually evaluate specific triggers.
Used to force evaluation of triggers outside of their normal automatic evaluation cycle. This is typically used for testing or debugging trigger behavior.
Note
This is primarily used internally by the Roboto platform and is not commonly needed by SDK users.
- Parameters:
data (Any)
- trigger_evaluation_ids: collections.abc.Iterable[int]#
Collection of trigger evaluation IDs to process.
- class roboto.domain.actions.ExecutableProvenance(/, **data)#
Bases:
pydantic.BaseModel
Provenance information for an action executable
- Parameters:
data (Any)
- container_image_digest: str | None = None#
- container_image_uri: str | None = None#
- class roboto.domain.actions.ExecutorContainer#
Bases:
enum.Enum
Type of container running as part of an action invocation
- Action = 'action'#
- LogRouter = 'firelens_log_router'#
- Monitor = 'monitor'#
- OutputHandler = 'output_handler'#
- Setup = 'setup'#
- class roboto.domain.actions.FileSelector(/, **data)#
Bases:
DataSelector
Selector for file inputs to an action invocation.
This selector type exists for backward compatibility purposes. We encourage you to use the query field to scope your input query to any dataset and/or file paths.
- Parameters:
data (Any)
- ensure_not_empty()#
- Return type:
- paths: list[str] | None = None#
path LIKE <path pattern in double quotes>
- Type:
File paths or patterns. Prefer RoboQL
- class roboto.domain.actions.Invocation(record, roboto_client=None)#
An instance of an execution of an action, initiated manually by a user or automatically by a trigger.
An Invocation represents a single execution of an Action with specific inputs, parameters, and configuration. It tracks the execution lifecycle from creation through completion, including status updates, logs, and results.
Invocations are created by calling
Action.invoke()
or through the UI. They cannot be created directly through the constructor. Each invocation has a unique ID and maintains a complete audit trail of its execution.Key features:
Status tracking (Queued, Running, Completed, Failed, etc.)
Input data specification and parameter values
Compute requirement and container parameter overrides
Log collection and output file management
Progress monitoring and result retrieval
- Parameters:
record (roboto.domain.actions.invocation_record.InvocationRecord)
roboto_client (Optional[roboto.http.RobotoClient])
- property action: roboto.domain.actions.invocation_record.ActionProvenance#
Provenance information about the action that was invoked.
- cancel()#
Cancel this invocation if it is not already in a terminal status.
Attempts to cancel the invocation. If the invocation has already completed, failed, or reached another terminal status, this method has no effect.
- Raises:
RobotoNotFoundException – If the invocation is not found.
RobotoUnauthorizedException – If the caller lacks permission to cancel the invocation.
- Return type:
None
Examples
Cancel a running invocation:
>>> invocation = Invocation.from_id("iv_12345") >>> if not invocation.reached_terminal_status: ... invocation.cancel()
- property compute_requirements: roboto.domain.actions.action_record.ComputeRequirements#
The compute requirements (CPU, memory) used for this invocation.
- property container_parameters: roboto.domain.actions.action_record.ContainerParameters#
The container parameters used for this invocation.
- property created: datetime.datetime#
The timestamp when this invocation was created.
- Return type:
datetime.datetime
- property current_status: roboto.domain.actions.invocation_record.InvocationStatus#
The current status of this invocation (e.g., Queued, Running, Completed).
- property data_source: roboto.domain.actions.invocation_record.InvocationDataSource#
The data source that provided input data for this invocation.
- property executable: roboto.domain.actions.invocation_record.ExecutableProvenance#
Provenance information about the executable (container) that was run.
- classmethod from_id(invocation_id, roboto_client=None)#
Load an existing invocation by its ID.
Retrieves an invocation from the Roboto platform using its unique identifier.
- Parameters:
invocation_id (str) – The unique ID of the invocation to retrieve.
roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses default if not provided.
- Returns:
The Invocation instance.
- Raises:
RobotoNotFoundException – If the invocation is not found.
RobotoUnauthorizedException – If the caller lacks permission to access the invocation.
- Return type:
Examples
Load an invocation and check its status:
>>> invocation = Invocation.from_id("iv_12345") >>> print(f"Status: {invocation.current_status}") >>> print(f"Created: {invocation.created}")
- get_logs(page_token=None)#
Retrieve runtime STDOUT/STDERR logs generated during this invocation’s execution.
Fetches log records from the invocation’s container execution, with support for pagination to handle large log volumes.
- Parameters:
page_token (Optional[str]) – Optional token for pagination. If provided, starts retrieving logs from that point.
- Yields:
LogRecord instances containing log messages and metadata.
- Raises:
RobotoNotFoundException – If the invocation is not found.
RobotoUnauthorizedException – If the caller lacks permission to access logs.
- Return type:
collections.abc.Generator[roboto.domain.actions.invocation_record.LogRecord, None, None]
- property id: str#
The unique identifier for this invocation.
- Return type:
str
- property input_data: roboto.domain.actions.invocation_record.InvocationInput | None#
The input data specification for this invocation, if any.
- Return type:
Optional[roboto.domain.actions.invocation_record.InvocationInput]
- is_queued_for_scheduling()#
- An invocation is queued for scheduling if:
1. its most recent status is “Queued” 3. and is not “Deadly”
- Return type:
bool
- property org_id: str#
The organization ID that owns this invocation.
- Return type:
str
- property parameter_values: dict[str, Any]#
The parameter values that were provided when this invocation was created.
- Return type:
dict[str, Any]
- classmethod query(spec=None, owner_org_id=None, roboto_client=None)#
Query invocations with optional filtering and pagination.
Searches for invocations based on the provided query specification. Can filter by status, action name, creation time, and other attributes.
- Parameters:
spec (Optional[roboto.query.QuerySpecification]) – Query specification with filters, sorting, and pagination. If not provided, returns all accessible invocations.
owner_org_id (Optional[str]) – Organization ID to search within. If not provided, searches in the caller’s organization.
roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses default if not provided.
- Yields:
Invocation instances matching the query criteria.
- Raises:
ValueError – If the query specification contains unknown fields.
RobotoUnauthorizedException – If the caller lacks permission to query invocations.
- Return type:
collections.abc.Generator[Invocation, None, None]
Examples
Query all invocations:
>>> for invocation in Invocation.query(): ... print(f"Invocation: {invocation.id}")
Query completed invocations:
>>> from roboto.query import QuerySpecification >>> from roboto.domain.actions import InvocationStatus >>> spec = QuerySpecification().where("current_status").equals(InvocationStatus.Completed) >>> completed = list(Invocation.query(spec))
Query recent invocations:
>>> spec = QuerySpecification().order_by("created", ascending=False).limit(10) >>> recent = list(Invocation.query(spec))
- property reached_terminal_status: bool#
True if this invocation has reached a terminal status (Completed, Failed, etc.).
- Return type:
bool
- property record: roboto.domain.actions.invocation_record.InvocationRecord#
The underlying invocation record containing all invocation data.
- refresh()#
- Return type:
- set_container_image_digest(digest)#
This is an admin-only operation to memorialize the digest of the container image that was pulled in the course of invoking the action.
- Parameters:
digest (str)
- Return type:
- set_logs_location(logs)#
This is an admin-only operation to memorialize the base location where invocation logs are saved.
Use the “get_logs” or “stream_logs” methods to access invocation logs.
- Parameters:
- Return type:
- property source: roboto.domain.actions.invocation_record.SourceProvenance#
Provenance information about the source that initiated this invocation.
- property status_log: list[roboto.domain.actions.invocation_record.InvocationStatusRecord]#
The complete history of status changes for this invocation.
- Return type:
list[roboto.domain.actions.invocation_record.InvocationStatusRecord]
- stream_logs(last_read=None)#
- Parameters:
last_read (Optional[str])
- Return type:
collections.abc.Generator[roboto.domain.actions.invocation_record.LogRecord, None, Optional[str]]
- property timeout: int#
The timeout in minutes for this invocation.
- Return type:
int
- to_dict()#
- Return type:
dict[str, Any]
- update_status(next_status, detail=None)#
- Parameters:
next_status (roboto.domain.actions.invocation_record.InvocationStatus)
detail (Optional[str])
- Return type:
- property upload_destination: roboto.domain.actions.invocation_record.InvocationUploadDestination | None#
The destination where output files from this invocation will be uploaded.
- Return type:
Optional[roboto.domain.actions.invocation_record.InvocationUploadDestination]
- wait_for_terminal_status(timeout=60 * 5, poll_interval=5)#
Wait for the invocation to reach a terminal status.
Throws a
TimeoutError
if the timeout is reached.- Parameters:
timeout (float) – The maximum amount of time, in seconds, to wait for the invocation to reach a terminal status.
poll_interval (roboto.waiters.Interval) – The amount of time, in seconds, to wait between polling iterations.
- Return type:
None
- class roboto.domain.actions.InvocationDataSource(/, **data)#
Bases:
pydantic.BaseModel
Abstracted data source that can be provided to an invocation.
Represents a source of input data for action invocations. The data source type determines how the ID should be interpreted (e.g., as a dataset ID).
- Parameters:
data (Any)
- data_source_id: str#
The ID of the data source. For Dataset type, this is a dataset ID.
- data_source_type: InvocationDataSourceType#
The type of data source (currently only Dataset).
- is_unspecified()#
Check if this data source is unspecified.
- Returns:
True if this is an unspecified data source, False otherwise.
- Return type:
bool
- static unspecified()#
Returns a special value indicating that no invocation source is specified.
- Returns:
An InvocationDataSource instance representing an unspecified data source.
- Return type:
- class roboto.domain.actions.InvocationDataSourceType#
Bases:
enum.Enum
Source of data for an action’s input binding.
Defines the type of data source that provides input data to an action invocation. Currently supports datasets, with potential for future expansion to other data source types.
- Dataset = 'Dataset'#
- class roboto.domain.actions.InvocationInput(/, **data)#
Bases:
pydantic.BaseModel
Input specification for an action invocation.
An invocation may require no inputs at all, or some combination of Roboto files, topics, events, etc. Those are specified using selectors, which tell the invocation how to locate inputs. Selector choices include RoboQL queries (for maximum flexibility), as well as unique IDs or friendly names.
Note: support for certain input types is a work in progress, and will be offered in future Roboto platform releases.
At least one data selector must be provided in order to construct a valid InvocationInput instance.
- Parameters:
data (Any)
- ensure_not_empty()#
- Return type:
- property file_paths: list[str]#
- Return type:
list[str]
- files: FileSelector | list[FileSelector] | None = None#
File selectors.
- classmethod from_dataset_file_paths(dataset_id, file_paths)#
- Parameters:
dataset_id (str)
file_paths (list[str])
- Return type:
- property safe_files: list[FileSelector]#
- Return type:
list[FileSelector]
- property safe_topics: list[DataSelector]#
- Return type:
list[DataSelector]
- topics: DataSelector | list[DataSelector] | None = None#
Topic selectors.
- class roboto.domain.actions.InvocationProvenance(/, **data)#
Bases:
pydantic.BaseModel
Provenance information for an invocation
- Parameters:
data (Any)
- action: ActionProvenance#
The Action that was invoked.
- executable: ExecutableProvenance#
The underlying executable (e.g., Docker image) that was run.
- source: SourceProvenance#
The source of the invocation.
- class roboto.domain.actions.InvocationRecord(/, **data)#
Bases:
pydantic.BaseModel
A wire-transmissible representation of an invocation.
- Parameters:
data (Any)
- compute_requirements: roboto.domain.actions.action_record.ComputeRequirements#
- container_parameters: roboto.domain.actions.action_record.ContainerParameters#
- created: datetime.datetime#
- data_source: InvocationDataSource#
- duration: datetime.timedelta = None#
- idempotency_id: str | None = None#
- input_data: list[str]#
- invocation_id: str#
- last_heartbeat: datetime.datetime | None = None#
- last_status: InvocationStatus#
- org_id: str#
- parameter_values: dict[str, Any] = None#
- provenance: InvocationProvenance#
- rich_input_data: InvocationInput | None = None#
- status: list[InvocationStatusRecord] = None#
- timeout: int#
- upload_destination: InvocationUploadDestination | None = None#
- class roboto.domain.actions.InvocationSource#
Bases:
enum.Enum
Method by which an invocation was run
- Manual = 'Manual'#
- Trigger = 'Trigger'#
- class roboto.domain.actions.InvocationStatus#
Bases:
int
,enum.Enum
Invocation status enum
- Cancelled = 997#
- Completed = 5#
- Deadly = 999#
- Downloading = 2#
- Failed = 998#
- Processing = 3#
- Queued = 0#
- Scheduled = 1#
- Uploading = 4#
- can_transition_to(other)#
- Parameters:
other (InvocationStatus)
- Return type:
bool
- static from_value(v)#
- Parameters:
v (Union[int, str])
- Return type:
- is_running()#
- Return type:
bool
- is_terminal()#
- Return type:
bool
- next()#
- Return type:
Optional[InvocationStatus]
- class roboto.domain.actions.InvocationStatusRecord(/, **data)#
Bases:
pydantic.BaseModel
A wire-transmissible representation of an invocation status.
- Parameters:
data (Any)
- detail: str | None = None#
- status: InvocationStatus#
- timestamp: datetime.datetime#
- to_presentable_dict()#
- Return type:
dict[str, Optional[str]]
- class roboto.domain.actions.InvocationUploadDestination(/, **data)#
Bases:
pydantic.BaseModel
Default destination to which invocation outputs - if any - should be uploaded.
Specifies where files generated during action execution should be stored. Actions can write files to their output directory, and this destination determines where those files are uploaded after execution completes.
- Parameters:
data (Any)
- classmethod dataset(dataset_id)#
Create a dataset upload destination with the given ID.
- Parameters:
dataset_id (str) – The ID of the dataset where outputs should be uploaded.
- Returns:
An InvocationUploadDestination configured for the specified dataset.
- Return type:
- destination_id: str | None = None#
Optional identifier for the upload destination. In the case of a dataset, it would be the dataset ID.
- destination_type: UploadDestinationType#
Type of upload destination. By default, outputs are uploaded to a dataset.
- property is_dataset: bool#
True if this is a dataset destination with a dataset ID, False otherwise.
- Returns:
True if this destination is configured for a dataset with a valid ID.
- Return type:
bool
- property is_unknown: bool#
True if the upload destination is not of a supported type, False otherwise.
- Return type:
bool
- classmethod pre_validate_destination_type(value)#
- Parameters:
value (Any)
- Return type:
Any
- class roboto.domain.actions.LogRecord(/, **data)#
Bases:
pydantic.BaseModel
A wire-transmissible representation of a log record.
- Parameters:
data (Any)
- log: str#
- partial_id: str | None = None#
- timestamp: datetime.datetime#
- class roboto.domain.actions.LogsLocation(/, **data)#
Bases:
pydantic.BaseModel
Invocation log storage location
- Parameters:
data (Any)
- bucket: str#
- prefix: str#
- class roboto.domain.actions.QueryTriggersRequest(/, **data)#
Bases:
pydantic.BaseModel
Request payload to query triggers with filters.
Used to search for triggers based on various criteria such as name, status, or other attributes.
- Parameters:
data (Any)
- filters: dict[str, Any] = None#
Dictionary of filter criteria to apply to the trigger search.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class roboto.domain.actions.SetActionAccessibilityRequest(/, **data)#
Bases:
pydantic.BaseModel
Request payload to set action accessibility.
Used to change whether an action is private to the organization or published publicly in the Action Hub.
- Parameters:
data (Any)
- accessibility: roboto.domain.actions.action_record.Accessibility#
The new accessibility level (Organization or ActionHub).
- digest: str | None = None#
Specific version of Action. If not specified, the latest version’s accessibility will be updated.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class roboto.domain.actions.SetContainerInfoRequest(/, **data)#
Bases:
pydantic.BaseModel
Request to set container information for an invocation.
Used internally by the Roboto platform to record container details after the action image has been pulled and inspected.
Note
This is typically called from the monitor process and is not intended for direct use by SDK users.
- Parameters:
data (Any)
- image_digest: str#
The digest of the container image that was pulled.
- class roboto.domain.actions.SetLogsLocationRequest(/, **data)#
Bases:
pydantic.BaseModel
Request to set the location where invocation logs are stored.
Used internally by the Roboto platform to record where log files are saved for later retrieval.
Note
This is typically called from the invocation-scheduling-service and is not intended for direct use by SDK users.
- Parameters:
data (Any)
- bucket: str#
S3 bucket name where logs are stored.
- prefix: str#
S3 key prefix for the log files.
- class roboto.domain.actions.SourceProvenance(/, **data)#
Bases:
pydantic.BaseModel
Provenance information for an invocation source
- Parameters:
data (Any)
- source_id: str#
- source_type: InvocationSource#
- class roboto.domain.actions.Trigger(record, roboto_client=None)#
A rule that automatically invokes an action when specific events or conditions occur.
Triggers enable automated data processing workflows by monitoring for specific events (like new datasets being created) and automatically invoking actions when conditions are met. They eliminate the need for manual intervention in routine data processing tasks.
Triggers can be configured to:
Monitor for new datasets, files, or other data sources
Apply conditional logic to determine when to execute
Specify input data patterns and action parameters
Override compute requirements and container parameters
Execute actions for each matching item or in batch
A trigger consists of:
Target action to invoke
Input data requirements and patterns
Execution conditions and causes
Parameter values and overrides
Scheduling and execution settings
- Parameters:
roboto_client (Optional[roboto.http.RobotoClient])
- property condition: roboto.query.ConditionType | None#
- Return type:
Optional[roboto.query.ConditionType]
- classmethod create(name, action_name, required_inputs, for_each, enabled=True, action_digest=None, action_owner_id=None, additional_inputs=None, causes=None, compute_requirement_overrides=None, condition=None, container_parameter_overrides=None, parameter_values=None, service_user_id=None, timeout=None, caller_org_id=None, roboto_client=None)#
Create a new trigger that automatically invokes an action when conditions are met.
Creates a trigger that monitors for specific events (like new datasets or files) and automatically invokes the specified action when the trigger conditions are satisfied. This enables automated data processing workflows.
- Parameters:
name (str) – Unique name for the trigger within the organization.
action_name (str) – Name of the action to invoke when the trigger fires.
required_inputs (list[str]) – List of file patterns that must be present for the trigger to fire. Uses glob patterns like “**/.bag” or “data/.csv”.
for_each (roboto.domain.actions.trigger_record.TriggerForEachPrimitive) – Granularity of execution - Dataset creates one invocation per dataset, DatasetFile creates one invocation per matching file.
enabled (bool) – Whether the trigger should be active immediately after creation.
action_digest (Optional[str]) – Specific version digest of the action to invoke. If not provided, uses the latest version.
action_owner_id (Optional[str]) – Organization ID that owns the target action. If not provided, searches in the caller’s organization.
additional_inputs (Optional[list[str]]) – Optional additional file patterns to include in invocations.
causes (Optional[list[roboto.domain.actions.trigger_record.TriggerEvaluationCause]]) – List of events that can cause this trigger to be evaluated. If not provided, uses default causes.
compute_requirement_overrides (Optional[roboto.domain.actions.invocation_record.ComputeRequirements]) – Optional compute requirement overrides for action invocations.
condition (Optional[roboto.query.ConditionType]) – Optional condition that must be met for the trigger to fire. Can filter based on metadata, file properties, etc.
container_parameter_overrides (Optional[roboto.domain.actions.invocation_record.ContainerParameters]) – Optional container parameter overrides for action invocations.
parameter_values (Optional[dict[str, Any]]) – Parameter values to pass to the action when invoked.
service_user_id (Optional[str]) – Optional service user ID for authentication.
timeout (Optional[int]) – Optional timeout override for action invocations in minutes.
caller_org_id (Optional[str]) – Organization ID to create the trigger in. Defaults to caller’s org.
roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses default if not provided.
- Returns:
The newly created Trigger instance.
- Raises:
RobotoIllegalArgumentException – If the trigger configuration is invalid.
RobotoInvalidRequestException – If the request is malformed.
RobotoUnauthorizedException – If the caller lacks permission to create triggers.
- Return type:
Examples
Create a simple trigger for ROS bag files:
>>> from roboto.domain.actions import Trigger, TriggerForEachPrimitive >>> trigger = Trigger.create( ... name="auto_process_bags", ... action_name="ros_ingestion", ... required_inputs=["**/*.bag"], ... for_each=TriggerForEachPrimitive.Dataset ... )
Create a conditional trigger with parameters:
>>> from roboto.query import Condition >>> condition = Condition("metadata.sensor_type").equals("lidar") >>> trigger = Trigger.create( ... name="lidar_processing", ... action_name="lidar_processor", ... required_inputs=["**/*.pcd"], ... for_each=TriggerForEachPrimitive.Dataset, ... condition=condition, ... parameter_values={"resolution": "high", "filter": "statistical"} ... )
Create a trigger with compute overrides:
>>> from roboto.domain.actions import ComputeRequirements >>> trigger = Trigger.create( ... name="heavy_processing", ... action_name="ml_inference", ... required_inputs=["**/*.jpg", "**/*.png"], ... for_each=TriggerForEachPrimitive.DatasetFile, ... compute_requirement_overrides=ComputeRequirements(vCPU=8192, memory=16384) ... )
- property created: datetime.datetime#
- Return type:
datetime.datetime
- property created_by: str#
- Return type:
str
- delete()#
- disable()#
- enable()#
- property enabled: bool#
- Return type:
bool
- property for_each: roboto.domain.actions.trigger_record.TriggerForEachPrimitive#
- classmethod from_name(name, owner_org_id=None, roboto_client=None)#
- Parameters:
name (str)
owner_org_id (Optional[str])
roboto_client (Optional[roboto.http.RobotoClient])
- Return type:
- get_action()#
- Return type:
- get_evaluations(limit=None, page_token=None)#
- Parameters:
limit (Optional[int])
page_token (Optional[str])
- Return type:
collections.abc.Generator[roboto.domain.actions.trigger_record.TriggerEvaluationRecord, None, None]
- static get_evaluations_for_dataset(dataset_id, owner_org_id=None, roboto_client=None)#
Get all trigger evaluations for a specific dataset.
Retrieves the history of trigger evaluations that were performed for a given dataset, including successful invocations and failed attempts.
- Parameters:
dataset_id (str) – The ID of the dataset to get evaluations for.
owner_org_id (Optional[str]) – Organization ID that owns the dataset. If not provided, searches in the caller’s organization.
roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses default if not provided.
- Yields:
TriggerEvaluationRecord instances for the dataset.
- Raises:
RobotoNotFoundException – If the dataset is not found.
RobotoUnauthorizedException – If the caller lacks permission to access evaluations.
- Return type:
collections.abc.Generator[roboto.domain.actions.trigger_record.TriggerEvaluationRecord, None, None]
Examples
Get all evaluations for a dataset:
>>> for evaluation in Trigger.get_evaluations_for_dataset("ds_12345"): ... print(f"Trigger: {evaluation.trigger_name}, Status: {evaluation.status}")
Check if any triggers succeeded for a dataset:
>>> from roboto.domain.actions import TriggerEvaluationStatus >>> evaluations = list(Trigger.get_evaluations_for_dataset("ds_12345")) >>> successful = [e for e in evaluations if e.status == TriggerEvaluationStatus.Succeeded] >>> print(f"Found {len(successful)} successful trigger evaluations")
- get_invocations()#
- Return type:
collections.abc.Generator[roboto.domain.actions.invocation.Invocation, None, None]
- invoke(data_source, idempotency_id=None, input_data_override=None, upload_destination=None)#
- Parameters:
data_source (roboto.domain.actions.invocation_record.InvocationDataSource)
idempotency_id (Optional[str])
input_data_override (Optional[list[str]])
upload_destination (Optional[roboto.domain.actions.invocation_record.InvocationUploadDestination])
- Return type:
- latest_evaluation()#
- Return type:
Optional[roboto.domain.actions.trigger_record.TriggerEvaluationRecord]
- property modified: datetime.datetime#
- Return type:
datetime.datetime
- property modified_by: str#
- Return type:
str
- property name#
- property org_id#
- classmethod query(spec=None, owner_org_id=None, roboto_client=None)#
- Parameters:
spec (Optional[roboto.query.QuerySpecification])
owner_org_id (Optional[str])
roboto_client (Optional[roboto.http.RobotoClient])
- Return type:
collections.abc.Generator[Trigger, None, None]
- property record: roboto.domain.actions.trigger_record.TriggerRecord#
- Return type:
- property service_user_id: str | None#
- Return type:
Optional[str]
- to_dict()#
- Return type:
dict[str, Any]
- property trigger_id: str#
- Return type:
str
- update(action_name=NotSet, action_owner_id=NotSet, action_digest=NotSet, additional_inputs=NotSet, causes=NotSet, compute_requirement_overrides=NotSet, container_parameter_overrides=NotSet, condition=NotSet, enabled=NotSet, for_each=NotSet, parameter_values=NotSet, required_inputs=NotSet, timeout=NotSet)#
- Parameters:
action_name (Union[str, roboto.sentinels.NotSetType])
action_owner_id (Union[str, roboto.sentinels.NotSetType])
action_digest (Optional[Union[str, roboto.sentinels.NotSetType]])
additional_inputs (Optional[Union[list[str], roboto.sentinels.NotSetType]])
causes (Union[list[roboto.domain.actions.trigger_record.TriggerEvaluationCause], roboto.sentinels.NotSetType])
compute_requirement_overrides (Optional[Union[roboto.domain.actions.invocation_record.ComputeRequirements, roboto.sentinels.NotSetType]])
container_parameter_overrides (Optional[Union[roboto.domain.actions.invocation_record.ContainerParameters, roboto.sentinels.NotSetType]])
condition (Optional[Union[roboto.query.ConditionType, roboto.sentinels.NotSetType]])
enabled (Union[bool, roboto.sentinels.NotSetType])
for_each (Union[roboto.domain.actions.trigger_record.TriggerForEachPrimitive, roboto.sentinels.NotSetType])
parameter_values (Optional[Union[dict[str, Any], roboto.sentinels.NotSetType]])
required_inputs (Union[list[str], roboto.sentinels.NotSetType])
timeout (Optional[Union[int, roboto.sentinels.NotSetType]])
- Return type:
- wait_for_evaluations_to_complete(timeout=60 * 5, poll_interval=5)#
Wait for all evaluations for this trigger to complete.
Throws a
TimeoutError
if the timeout is reached.- Parameters:
timeout (float) – The maximum amount of time, in seconds, to wait for the evaluations to complete.
poll_interval (roboto.waiters.Interval) – The amount of time, in seconds, to wait between polling iterations.
- Return type:
None
- class roboto.domain.actions.TriggerEvaluationCause#
Bases:
enum.Enum
The cause of a TriggerEvaluationRecord is the reason why the trigger was selected for evaluation.
Represents the specific event that caused a trigger to be evaluated for potential execution. Different causes may result in different trigger behavior or input data selection.
- DatasetMetadataUpdate = 'dataset_metadata_update'#
Trigger evaluation caused by changes to dataset metadata.
- FileIngest = 'file_ingest'#
Trigger evaluation caused by files being ingested into a dataset.
- FileUpload = 'file_upload'#
Trigger evaluation caused by new files being uploaded to a dataset.
- class roboto.domain.actions.TriggerEvaluationDataConstraint(/, **data)#
Bases:
pydantic.BaseModel
An optional filtering constraint applied to the data considered by a trigger evaluation.
Each trigger evaluation considers data of a particular data source ID and data source type. Typically (and to start, exclusively), this is a dataset ID (and type Dataset).
In the naive case before the introduction of this class, trigger evaluation for a dataset with 20k files would have to scan each file. This constraint allows us to filter the data to a subset during evaluation, e.g. only evaluate files from dataset ds_12345 with upload ID tx_123abc
- Parameters:
data (Any)
- transaction_id: str | None = None#
If set, only consider files from this upload.
- class roboto.domain.actions.TriggerEvaluationOutcome#
Bases:
enum.Enum
The outcome of a TriggerEvaluationRecord is the result of the evaluation. A trigger can either invoke its associated action (one or many times) or be skipped. If skipped, a skip reason is provided.
- InvokedAction = 'invoked_action'#
- Skipped = 'skipped'#
- class roboto.domain.actions.TriggerEvaluationOutcomeReason#
Bases:
enum.Enum
Context for why a trigger evaluation has its TriggerEvaluationOutcome
- AlreadyRun = 'already_run'#
This trigger has already run its associated action for this dataset and/or file.
- ConditionNotMet = 'condition_not_met'#
The trigger’s condition is not met.
- NoMatchingFiles = 'no_matching_files'#
In the case of a dataset trigger, there is no subset of files that, combined, match ALL of the trigger’s required inputs.
In the case of a file trigger, there are no files that match ANY of the trigger’s required inputs.
- TriggerDisabled = 'trigger_disabled'#
The trigger is disabled.
- class roboto.domain.actions.TriggerEvaluationRecord(/, **data)#
Bases:
pydantic.BaseModel
Record of a point-in-time evaluation of whether to invoke an action associated with a trigger for a data source.
- Parameters:
data (Any)
- cause: TriggerEvaluationCause | None = None#
- data_constraint: TriggerEvaluationDataConstraint | None = None#
- evaluation_end: datetime.datetime | None = None#
- evaluation_start: datetime.datetime#
- outcome: TriggerEvaluationOutcome | None = None#
- outcome_reason: TriggerEvaluationOutcomeReason | None = None#
- status: TriggerEvaluationStatus#
- status_detail: str | None = None#
- trigger_evaluation_id: int#
- trigger_id: str#
- class roboto.domain.actions.TriggerEvaluationStatus#
Bases:
enum.Enum
When a trigger is selected for evaluation, a trigger evaluation record is created with a status of Pending. The evaluation can either run to completion (regardless of its outcome), in which case the status is Evaluated, or hit an unexpected exception, in which case the status is Failed.
- Evaluated = 'evaluated'#
- Failed = 'failed'#
- Pending = 'pending'#
- class roboto.domain.actions.TriggerEvaluationsSummaryResponse(/, **data)#
Bases:
pydantic.BaseModel
Response containing summary information about trigger evaluations.
Provides high-level statistics about trigger evaluation status, useful for monitoring and debugging trigger performance.
- Parameters:
data (Any)
- count_pending: int#
Number of trigger evaluations currently pending.
- last_evaluation_start: datetime.datetime | None#
Timestamp of the most recent evaluation start, if any evaluations have occurred.
- class roboto.domain.actions.TriggerForEachPrimitive#
Bases:
str
,enum.Enum
Defines the granularity at which a trigger executes.
Determines whether the trigger creates one invocation per dataset or one invocation per file within datasets that match the trigger conditions.
- Dataset = 'dataset'#
Execute one action invocation per dataset that matches the trigger conditions.
- DatasetFile = 'dataset_file'#
Execute one action invocation per file in datasets that match the trigger conditions.
- class roboto.domain.actions.TriggerRecord(/, **data)#
Bases:
pydantic.BaseModel
A wire-transmissible representation of a trigger.
Contains all the configuration and metadata for a trigger, including the target action, input requirements, conditions, and execution settings.
This is the underlying data structure used by the Trigger domain class to store and transmit trigger information.
- Parameters:
data (Any)
- action: roboto.domain.actions.action_record.ActionReference#
Reference to the action that should be invoked.
- additional_inputs: list[str] | None = None#
Optional additional file patterns to include.
- causes: list[TriggerEvaluationCause] | None = None#
List of events that can cause this trigger to be evaluated.
- compute_requirement_overrides: roboto.domain.actions.action_record.ComputeRequirements | None = None#
Optional compute requirement overrides.
- condition: roboto.query.ConditionType | None = None#
Optional condition that must be met for trigger to fire.
- container_parameter_overrides: roboto.domain.actions.action_record.ContainerParameters | None = None#
Optional container parameter overrides.
- created: datetime.datetime#
Timestamp when the trigger was created.
- created_by: str#
User ID who created the trigger.
- enabled: bool = True#
Whether the trigger is currently active.
- for_each: TriggerForEachPrimitive#
Granularity of trigger execution (Dataset or DatasetFile).
- modified: datetime.datetime#
Timestamp when the trigger was last modified.
- modified_by: str#
User ID who last modified the trigger.
- name: str#
Human-readable name for the trigger.
- org_id: str#
Organization ID that owns the trigger.
- parameter_values: dict[str, Any] = None#
Parameter values to pass to the action.
- required_inputs: list[str]#
File patterns that must be present for trigger to fire.
- service_user_id: str#
Service user ID for authentication.
- timeout: int | None = None#
Optional timeout override for action execution.
- trigger_id: str#
Unique identifier for the trigger.
- validate_additional_inputs(value)#
- Parameters:
value (Optional[list[str]])
- Return type:
Optional[list[str]]
- validate_required_inputs(value)#
- Parameters:
value (list[str])
- Return type:
list[str]
- class roboto.domain.actions.UpdateActionRequest(/, **data)#
Bases:
pydantic.BaseModel
Request payload to update an action.
Contains the changes to apply to an existing action. Only specified fields will be updated; others remain unchanged. Uses NotSet sentinel values to distinguish between explicit None values and unspecified fields.
- Parameters:
data (Any)
- compute_requirements: roboto.domain.actions.action_record.ComputeRequirements | roboto.sentinels.NotSetType | None#
New compute requirements (CPU, memory).
- container_parameters: roboto.domain.actions.action_record.ContainerParameters | roboto.sentinels.NotSetType | None#
New container parameters (image, entrypoint, etc.).
- description: str | roboto.sentinels.NotSetType | None#
New detailed description.
- inherits: roboto.domain.actions.action_record.ActionReference | roboto.sentinels.NotSetType | None#
New action reference to inherit from.
- metadata_changeset: roboto.updates.MetadataChangeset | roboto.sentinels.NotSetType#
Changes to apply to metadata (add, remove, update keys).
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- parameter_changeset: roboto.domain.actions.action_record.ActionParameterChangeset | roboto.sentinels.NotSetType#
Changes to apply to parameters (add, remove, update).
- requires_downloaded_inputs: bool | roboto.sentinels.NotSetType#
Whether to download input files before execution.
- short_description: str | roboto.sentinels.NotSetType | None#
New brief description (max 140 characters).
- timeout: int | roboto.sentinels.NotSetType | None#
New maximum execution time in minutes.
- uri: str | roboto.sentinels.NotSetType | None#
New container image URI.
- validate_uri(v)#
- class roboto.domain.actions.UpdateInvocationStatus(/, **data)#
Bases:
pydantic.BaseModel
Request payload to update an invocation’s status.
Used to record status changes during invocation execution, such as transitioning from Queued to Running to Completed.
- Parameters:
data (Any)
- detail: str#
Additional detail about the status change.
- status: roboto.domain.actions.invocation_record.InvocationStatus#
The new status for the invocation.
- class roboto.domain.actions.UpdateTriggerRequest(/, **data)#
Bases:
pydantic.BaseModel
Request payload to update an existing trigger.
Contains the changes to apply to a trigger. Only specified fields will be updated; others remain unchanged. Uses NotSet sentinel values to distinguish between explicit None values and unspecified fields.
- Parameters:
data (Any)
- action_digest: str | roboto.sentinels.NotSetType | None#
New specific version digest of the action.
- action_name: str | roboto.sentinels.NotSetType#
New action name to invoke.
- action_owner_id: str | roboto.sentinels.NotSetType#
New organization ID that owns the target action.
- additional_inputs: list[str] | roboto.sentinels.NotSetType | None#
New additional file patterns to include.
- causes: list[roboto.domain.actions.trigger_record.TriggerEvaluationCause] | roboto.sentinels.NotSetType#
New list of events that can cause trigger evaluation.
- compute_requirement_overrides: roboto.domain.actions.ComputeRequirements | roboto.sentinels.NotSetType | None#
New compute requirement overrides.
- condition: roboto.query.ConditionType | roboto.sentinels.NotSetType | None#
New condition that must be met for trigger to fire.
- container_parameter_overrides: roboto.domain.actions.ContainerParameters | roboto.sentinels.NotSetType | None#
New container parameter overrides.
- enabled: bool | roboto.sentinels.NotSetType#
New enabled status for the trigger.
- for_each: roboto.domain.actions.trigger_record.TriggerForEachPrimitive | roboto.sentinels.NotSetType#
New execution granularity (Dataset or DatasetFile).
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- parameter_values: dict[str, Any] | roboto.sentinels.NotSetType | None#
New parameter values to pass to the action.
- required_inputs: list[str] | roboto.sentinels.NotSetType#
New list of required file patterns.
- timeout: int | roboto.sentinels.NotSetType | None#
New timeout override for action invocations.
- class roboto.domain.actions.UploadDestinationType#
Bases:
enum.Enum
Type of upload destination for invocation outputs.
Defines where files generated by action invocations should be uploaded. Currently supports datasets as the primary destination type.
- Dataset = 'Dataset'#
Outputs will be uploaded to a dataset. This is the default.
- Unknown = 'Unknown'#
The output destination is unknown.
This destination type exists for compatibility between different versions of the Roboto SDK and the Roboto service backend. It should not be used directly in action invocation requests. If you encounter it in an SDK response, consider upgrading to the latest available SDK version.