roboto.domain.actions.trigger_view#

Module Contents#

class roboto.domain.actions.trigger_view.TriggerOnEvent(/, **data)#

Bases: pydantic.BaseModel

Properties specific to event-driven triggers.

Parameters:

data (Any)

additional_inputs: list[str] | None = None#

Optional additional file patterns to include.

causes: list[roboto.domain.actions.trigger_record.TriggerEvaluationCause]#

One or more events that cause the trigger to be evaluated.

condition: roboto.query.ConditionType | None = None#

Optional condition that must be met for trigger to fire.

for_each: roboto.domain.actions.trigger_record.TriggerForEachPrimitive#

Granularity of trigger execution.

required_inputs: list[str]#

File patterns that must be present for trigger to fire.

class roboto.domain.actions.trigger_view.TriggerOnSchedule(/, **data)#

Bases: pydantic.BaseModel

Properties specific to scheduled triggers.

Parameters:

data (Any)

invocation_input: roboto.domain.actions.invocation_record.InvocationInput | None = None#

Input specification for each scheduled invocation.

next_occurrence: datetime.datetime | None = None#

Next scheduled invocation time.

schedule: str#

Recurring invocation schedule.

class roboto.domain.actions.trigger_view.TriggerType#

Bases: str, enum.Enum

Types of triggers supported by the Roboto platform.

EventDriven = 'event_driven'#

A trigger that invokes its target action in response to an event.

See TriggerEvaluationCause for the currently supported causes for event-driven triggers to be evaluated.

Scheduled = 'scheduled'#

A trigger that invokes its target action on a recurring schedule.

class roboto.domain.actions.trigger_view.TriggerView(/, **data)#

Bases: pydantic.BaseModel

Unified data model for all Roboto trigger types.

Parameters:

data (Any)

action: roboto.domain.actions.action_record.ActionReference#

Reference to the trigger’s target action.

compute_requirement_overrides: roboto.domain.actions.action_record.ComputeRequirements | None = None#

Optional compute requirement overrides.

container_parameter_overrides: roboto.domain.actions.action_record.ContainerParameters | None = None#

Optional container parameter overrides.

created: datetime.datetime#

Creation time for the scheduled trigger.

created_by: str#

User who created the scheduled trigger.

enabled: bool#

True if the trigger is enabled.

invocation_upload_destination: roboto.domain.actions.invocation_record.InvocationUploadDestination | None = None#

Optional default upload destination for action invocations.

modified: datetime.datetime#

Latest modification time for the scheduled trigger.

modified_by: str#

User who last modified this scheduled trigger.

name: str#

Trigger name. Unique within an organization and trigger type.

on_event: TriggerOnEvent | None = None#

Properties of triggers that fire on events (TriggerType.EventDriven).

on_schedule: TriggerOnSchedule | None = None#

Properties of triggers that fire on a recurring schedule (TriggerType.Scheduled).

org_id: str#

Organization ID which owns the scheduled trigger.

parameter_values: dict[str, Any] | None = None#

Optional action parameter values.

service_user_id: str#

Service user ID for authentication.

timeout: int | None = None#

Optional invocation timeout, in minutes.

to_event_trigger_record()#

Convert this trigger view into a TriggerRecord if possible.

Returns:

A TriggerRecord instance if self.trigger_type is TriggerType.EventDriven, otherwise None.

Return type:

Optional[roboto.domain.actions.trigger_record.TriggerRecord]

to_scheduled_trigger_record()#

Convert this trigger view into a ScheduledTriggerRecord if possible.

Returns:

A ScheduledTriggerRecord instance if self.trigger_type is TriggerType.Scheduled, otherwise None.

Return type:

Optional[roboto.domain.actions.scheduled_trigger_record.ScheduledTriggerRecord]

trigger_id: str#

Unique trigger ID.

trigger_type: TriggerType#

Trigger type.