Skip to main content

Overview

PipelineTask is the central class for managing pipeline execution. It handles the lifecycle of the pipeline, processes frames in both directions, manages task cancellation, and provides event handlers for monitoring pipeline activity.

Basic Usage

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask

# Create a pipeline
pipeline = Pipeline([...])

# Create a task with the pipeline
task = PipelineTask(pipeline)

# Queue frames for processing
await task.queue_frame(TTSSpeakFrame("Hello, how can I help you today?"))

# Run the pipeline
runner = PipelineRunner()
await runner.run(task)

Constructor Parameters

pipeline
BasePipeline
required
The pipeline to execute.
params
PipelineParams
default:"PipelineParams()"
Configuration parameters for the pipeline. See PipelineParams for details.
observers
List[BaseObserver]
default:"[]"
List of observers for monitoring pipeline execution. See Observers for details.
clock
BaseClock
default:"SystemClock()"
Clock implementation for timing operations.
task_manager
Optional[BaseTaskManager]
default:"None"
Custom task manager for handling asyncio tasks. If None, a default TaskManager is used.
check_dangling_tasks
bool
default:"True"
Whether to check for processors’ tasks finishing properly.
idle_timeout_secs
Optional[float]
default:"300"
Timeout in seconds before considering the pipeline idle. Set to None to disable idle detection. See Pipeline Idle Detection for details.
idle_timeout_frames
Tuple[Type[Frame], ...]
default:"(BotSpeakingFrame, LLMFullResponseEndFrame)"
Frame types that should prevent the pipeline from being considered idle. See Pipeline Idle Detection for details.
cancel_on_idle_timeout
bool
default:"True"
Whether to automatically cancel the pipeline task when idle timeout is reached. See Pipeline Idle Detection for details.
enable_tracing
bool
default:"False"
Whether to enable OpenTelemetry tracing. See The OpenTelemetry guide for details.
enable_turn_tracking
bool
default:"False"
Whether to enable turn tracking. See The OpenTelemetry guide for details.
conversation_id
Optional[str]
default:"None"
Custom ID for the conversation. If not provided, a UUID will be generated. See The OpenTelemetry guide for details.
additional_span_attributes
Optional[dict]
default:"None"
Any additional attributes to add to top-level OpenTelemetry conversation span. See The OpenTelemetry guide for details.

Methods

Task Lifecycle Management

run()
async
Starts and manages the pipeline execution until completion or cancellation.
await task.run()
stop_when_done()
async
Sends an EndFrame to the pipeline to gracefully stop the task after all queued frames have been processed.
await task.stop_when_done()
cancel()
async
Stops the running pipeline immediately by sending a CancelFrame.
  await task.cancel()
has_finished()
bool
Returns whether the task has finished (all processors have stopped).
if task.has_finished(): print("Task is complete")

Frame Management

queue_frame()
async
Queues a single frame to be pushed down the pipeline.
await task.queue_frame(TTSSpeakFrame("Hello!"))
queue_frames()
async
Queues multiple frames to be pushed down the pipeline.
frames = [TTSSpeakFrame("Hello!"), TTSSpeakFrame("How are you?")]

await task.queue_frames(frames)

Event Handlers

PipelineTask provides event handlers for monitoring pipeline lifecycle and frame flow. Register handlers using the @event_handler decorator.
EventDescription
on_pipeline_startedPipeline has started processing
on_pipeline_finishedPipeline reached a terminal state
on_pipeline_errorAn error frame reached the pipeline task
on_frame_reached_upstreamA filtered frame type reached the pipeline source
on_frame_reached_downstreamA filtered frame type reached the pipeline sink
on_idle_timeoutNo activity detected within the idle timeout period

on_pipeline_started

Fired when the StartFrame has been processed by all processors in the pipeline. This indicates the pipeline is fully initialized and running.
@task.event_handler("on_pipeline_started")
async def on_pipeline_started(task, frame):
    print("Pipeline is running!")
Parameters:
ParameterTypeDescription
taskPipelineTaskThe pipeline task instance
frameStartFrameThe start frame that was processed

on_pipeline_finished

Fired after the pipeline reaches any terminal state. This includes normal completion (EndFrame), explicit stop (StopFrame), or cancellation (CancelFrame). Use this event for cleanup, logging, or post-processing.
@task.event_handler("on_pipeline_finished")
async def on_pipeline_finished(task, frame):
    if isinstance(frame, EndFrame):
        print("Pipeline ended normally")
    elif isinstance(frame, CancelFrame):
        print("Pipeline was cancelled")
    elif isinstance(frame, StopFrame):
        print("Pipeline was stopped")
Parameters:
ParameterTypeDescription
taskPipelineTaskThe pipeline task instance
frameFrameThe terminal frame (EndFrame, StopFrame, or CancelFrame)
The deprecated events on_pipeline_ended, on_pipeline_stopped, and on_pipeline_cancelled still work but will emit a deprecation warning. Use on_pipeline_finished and inspect the frame type if you need to distinguish between terminal states.

on_pipeline_error

Fired when an ErrorFrame reaches the pipeline task (upstream from a processor). If the error is fatal, the pipeline will be cancelled after this handler runs.
@task.event_handler("on_pipeline_error")
async def on_pipeline_error(task, frame):
    print(f"Pipeline error: {frame.error}")
    if frame.fatal:
        print("Fatal error — pipeline will be cancelled")
Parameters:
ParameterTypeDescription
taskPipelineTaskThe pipeline task instance
frameErrorFrameThe error frame with error details

on_frame_reached_upstream

Fired when a frame of a registered type reaches the pipeline source (the start of the pipeline). You must configure which frame types trigger this event using set_reached_upstream_filter() or add_reached_upstream_filter().
from pipecat.frames.frames import TranscriptionFrame

# Configure which frame types to monitor
task.set_reached_upstream_filter((TranscriptionFrame,))

@task.event_handler("on_frame_reached_upstream")
async def on_frame_reached_upstream(task, frame):
    print(f"Frame reached upstream: {frame}")
Parameters:
ParameterTypeDescription
taskPipelineTaskThe pipeline task instance
frameFrameThe frame that reached the pipeline source
This event only fires for frame types you’ve explicitly registered. By default, no frame types are monitored. This is for efficiency — checking every frame would be wasteful when you typically only care about specific types.

on_frame_reached_downstream

Fired when a frame of a registered type reaches the pipeline sink (the end of the pipeline). You must configure which frame types trigger this event using set_reached_downstream_filter() or add_reached_downstream_filter().
from pipecat.frames.frames import TTSAudioRawFrame

# Configure which frame types to monitor
task.set_reached_downstream_filter((TTSAudioRawFrame,))

@task.event_handler("on_frame_reached_downstream")
async def on_frame_reached_downstream(task, frame):
    print(f"Frame reached downstream: {frame}")
Parameters:
ParameterTypeDescription
taskPipelineTaskThe pipeline task instance
frameFrameThe frame that reached the pipeline sink

on_idle_timeout

Fired when no activity frames (as specified by idle_timeout_frames) have been received within the idle timeout period. See Pipeline Idle Detection for configuration details.
@task.event_handler("on_idle_timeout")
async def on_idle_timeout(task):
    print("Pipeline has been idle too long")
    await task.queue_frame(TTSSpeakFrame("Are you still there?"))
Parameters:
ParameterTypeDescription
taskPipelineTaskThe pipeline task instance
If cancel_on_idle_timeout is True (the default), the pipeline will be automatically cancelled after this handler runs. Set it to False if you want to handle idle timeouts yourself.