API de flujo de trabajo funcional

Warning

La API de flujo de trabajo funcional es experimental y está sujeta a cambios o eliminación en versiones futuras sin previo aviso.

La API de flujo de trabajo funcional permite escribir flujos de trabajo como funciones asincrónicas de Python. En lugar de definir clases de ejecutor, conectar bordes y usar WorkflowBuilder, decora una función async con @workflow y utiliza el flujo de control nativo de Python — if/else, bucles for, asyncio.gather — para expresar la lógica.

Para obtener una comparación lado a lado con la API de gráfico, consulte API de flujos de trabajo en la descripción general de los flujos de trabajo.

@workflow decorador

Aplicar @workflow a una async función para convertirlo en un FunctionalWorkflow objeto :

from agent_framework import workflow

@workflow
async def text_pipeline(text: str) -> str:
    upper = await to_upper_case(text)
    return await reverse_text(upper)

El @workflow decorador admite un formulario con parámetros con argumentos opcionales:

from agent_framework import InMemoryCheckpointStorage, workflow

storage = InMemoryCheckpointStorage()

@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
    ...

Parámetros @workflow

Parámetro Tipo Description
name str | None Nombre del flujo de trabajo para mostrar. El valor por defecto de la función es __name__.
description str | None Descripción opcional comprensible para humanos.
checkpoint_storage CheckpointStorage | None Almacenamiento predeterminado para conservar los resultados de los pasos entre ejecuciones. Se puede invalidar por cada llamada en run().

Firma de función de flujo de trabajo

El primer parámetro de la función de flujo de trabajo recibe la entrada pasada a .run(). Agregue un ctx: WorkflowRunContext parámetro solo cuando necesite HITL, estado de clave/valor o eventos personalizados; de lo contrario, es opcional:

# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
    result = await process(data)
    return result

# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
    feedback = await ctx.request_info({"draft": data}, response_type=str)
    return feedback

WorkflowRunContext se detecta primero por anotación de tipo y, a continuación, por el nombre ctx del parámetro, por lo que tanto ctx: WorkflowRunContext como un parámetro sin ctx funcionan.

Ejecutar un flujo de trabajo

Llame al .run() en el objeto FunctionalWorkflow devuelto por @workflow:

# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world")   # str — the raw return value

# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text)                # first output as a string
print(result.get_outputs())       # list of all outputs
print(result.get_final_state())   # WorkflowRunState.IDLE

Parámetros run()

Parámetro Tipo Description
message Any | None Entrada pasada a la función de flujo de trabajo como primer argumento.
stream bool Si True, devuelve un ResponseStream que produce WorkflowEvent objetos. Tiene como valor predeterminado False.
responses dict[str, Any] | None Respuestas HITL basadas en claves request_id. Se usa para reanudar un flujo de trabajo suspendido.
checkpoint_id str | None Punto de control desde el que se va a restaurar. Requiere establecer checkpoint_storage.
checkpoint_storage CheckpointStorage | None Invalida la configuración de almacenamiento predeterminada establecida en el decorador para esta ejecución.
include_status_events bool Incluya eventos de cambio de estado en el resultado que no sea de streaming.

Se debe proporcionar exactamente uno de message, responseso checkpoint_id por llamada.

WorkflowRunResult

run() (sin streaming) devuelve un WorkflowRunResult. Métodos clave:

Método/propiedad Returns Description
.text str Primera salida como una cadena. Cadena vacía si no hay cadena de salida.
.get_outputs() list[Any] Todas las salidas emitidas por el flujo de trabajo.
.get_final_state() WorkflowRunState Estado de ejecución final (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...).
.get_request_info_events() list[WorkflowEvent] Solicitudes HITL pendientes cuando el estado es IDLE_WITH_PENDING_REQUESTS.

Transmisión en línea

Pase stream=True para recibir eventos a medida que se produzcan:

from agent_framework import workflow

@workflow
async def data_pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return await transform_data(raw)

# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
    if event.type == "output":
        print(f"Output: {event.data}")

# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")

Consulte python/samples/03-workflows/functional/basic_streaming_pipeline.py para obtener un ejemplo completo.

@step decorador

@step es un decorador opcional que agrega almacenamiento en caché de resultados, emisión de eventos y puntos de comprobación por paso a funciones asincrónicas individuales:

from agent_framework import step, workflow

@step
async def fetch_data(url: str) -> dict:
    # expensive — hits a real API
    return await http_get(url)

@workflow
async def pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return process(raw)

Qué @step hace dentro de un flujo de trabajo

  • Almacena en caché los resultados; el resultado se almacena mediante (step_name, call_index). En la restauración del punto de control o la reanudación de HITL, un paso completado devuelve su resultado guardado de inmediato en lugar de ejecutarse nuevamente.
  • Emite eventos: executor_invoked / executor_completed / executor_failed se emiten para la observabilidad. En un acierto de caché, executor_bypassed se emite en su lugar.
  • Guarda los puntos de control : si el flujo de trabajo tiene checkpoint_storage, se guarda un punto de control después de que se complete cada paso.
  • Inyecta WorkflowRunContext — si la función de paso declara un ctx: WorkflowRunContext parámetro, el contexto activo se inyecta automáticamente.

Fuera de un flujo de trabajo en ejecución, @step es transparente, la función se comporta de forma idéntica a su versión sin decorar, lo que hace que sea completamente comprobable de forma aislada.

Cuándo debe usarse @step

Use @step en funciones que son costosas para volver a ejecutar: llamadas de agente, solicitudes de API externas o cualquier operación donde la reejecución al reanudar podría ser costosa o tener efectos secundarios. Las funciones sin formato (sin @step) siguen funcionando dentro de @workflow; simplemente se vuelven a ejecutar cuando se reanuda el flujo de trabajo.

from agent_framework import InMemoryCheckpointStorage, step, workflow

storage = InMemoryCheckpointStorage()

@step  # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
    return (await agent.run(prompt)).text

# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
    return len(text) > 0

@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
    draft = await call_llm(f"Write about: {topic}")
    ok = await validate(draft)
    return draft if ok else ""

@step también acepta un name parámetro:

@step(name="transform")
async def transform_data(raw: dict) -> str:
    ...

Consulte python/samples/03-workflows/functional/steps_and_checkpointing.py para obtener un ejemplo completo.

WorkflowRunContext

WorkflowRunContext (alias corto: RunContext) es el contexto de ejecución inyectado en los workflows y funciones de paso. Solo lo necesita cuando se usa HITL, estado de clave-valor o eventos personalizados.

Impórtelo desde agent_framework:

from agent_framework import WorkflowRunContext, workflow

ctx.request_info() — Humano en el bucle

ctx.request_info() suspende el flujo de trabajo para esperar la entrada externa:

@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
    draft = await write_draft(topic)
    feedback = await ctx.request_info(
        {"draft": draft, "instructions": "Please review this draft"},
        response_type=str,
        request_id="review_request",
    )
    return await revise_draft(draft, feedback)

Parámetros:

Parámetro Tipo Description
request_data Any Carga que describe qué entrada se necesita (dict, modelo Pydantic, cadena, ...).
response_type type Se esperaba un tipo de respuesta en Python.
request_id str | None Identificador estable para esta solicitud. Se genera un UUID aleatorio si se omite.

Semántica de reproducción: En la primera ejecución, request_info() genera una señal interna (nunca visible para el código) que suspende el flujo de trabajo. El llamante recibe un WorkflowRunResult con get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Reanude mediante una llamada a .run(responses={request_id: value}): el flujo de trabajo se vuelve a ejecutar desde el principio, y request_info() devuelve el valor proporcionado inmediatamente.

@stepLas funciones decoradas que se ejecutaron antes de la suspensión devuelven sus resultados almacenados en caché al reanudar, en lugar de volver a ejecutarse.

Control de la respuesta:

# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
print(requests[0].request_id)  # "review_request"

# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
    responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)

Consulte python/samples/03-workflows/functional/hitl_review.py para obtener un ejemplo completo.

ctx.request_info() también se admite dentro de @step funciones.

ctx.add_event() — Eventos personalizados

Use ctx.add_event() para emitir eventos específicos de la aplicación junto con los eventos del ciclo de vida del marco. Para obtener detalles completos y ejemplos, consulte Emisión de eventos personalizados.

ctx.get_state() / ctx.set_state() — Estado de clave/valor

Use ctx.get_state() y ctx.set_state() para almacenar valores que persistan en las interrupciones de HITL y se incluyan en los puntos de control. Para más información, consulte Estado del flujo de trabajo.

Los valores de estado deben ser serializables con JSON cuando se configura el almacenamiento de puntos de control.

ctx.is_streaming()

Devuelve True cuando se inició la ejecución actual con stream=True. Resulta útil dentro de las funciones de paso que quieren ajustar su comportamiento en función del modo de streaming.

get_run_context()

Recupera el activo WorkflowRunContext desde cualquier lugar dentro de un flujo de trabajo en ejecución, útil en las funciones auxiliares que no declaran un ctx parámetro:

from agent_framework import get_run_context

async def helper():
    ctx = get_run_context()
    if ctx is not None:
        ctx.set_state("helper_ran", True)

Devuelve None cuando se llama fuera de un flujo de trabajo en ejecución.

Paralelismo con asyncio.gather

Use la concurrencia estándar de Python para el fan-out/fan-in, sin necesidad de primitivos de framework.

import asyncio
from agent_framework import workflow

@workflow
async def research_pipeline(topic: str) -> str:
    web, papers, news = await asyncio.gather(
        research_web(topic),
        research_papers(topic),
        research_news(topic),
    )
    return await synthesize([web, papers, news])

asyncio.gather también funciona cuando las funciones están decoradas con @step.

Consulte python/samples/03-workflows/functional/parallel_pipeline.py para obtener un ejemplo completo.

Llamar a agentes dentro de flujos de trabajo

Las llamadas de agente funcionan como llamadas de función simples dentro de @workflow:

from agent_framework import Agent, workflow

writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)

@workflow
async def poem_workflow(topic: str) -> str:
    poem = (await writer.run(f"Write a poem about: {topic}")).text
    review = (await reviewer.run(f"Review this poem: {poem}")).text
    return f"Poem:\n{poem}\n\nReview: {review}"

Agregue @step a las funciones de llamadas de agente cuando desee almacenar en caché sus resultados en la reanudación de HITL o restauración de puntos de comprobación.

from agent_framework import step

@step
async def write_poem(topic: str) -> str:
    return (await writer.run(f"Write a poem about: {topic}")).text

Consulte python/samples/03-workflows/functional/agent_integration.py para obtener un ejemplo completo.

.as_agent() : uso de un flujo de trabajo como agente

Encapsula un FunctionalWorkflow como un objeto compatible con agentes usando .as_agent():

from agent_framework import workflow

@workflow
async def poem_workflow(topic: str) -> str:
    ...

# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")

# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)

# Or use in a larger workflow or orchestration

.as_agent() devuelve un FunctionalWorkflowAgent objeto que expone la misma run() interfaz que otros objetos de agente, lo que hace que los flujos de trabajo funcionales se puedan componer con cualquier sistema que acepte agentes.

Parámetro Tipo Description
name str | None Muestra el nombre del agente. El valor predeterminado es el nombre del flujo de trabajo.

Consulte python/samples/03-workflows/functional/agent_integration.py para obtener un ejemplo.

Samples

Los ejemplos ejecutables se encuentran en las siguientes carpetas de ejemplo:

Pasos siguientes

Temas relacionados::

La API de flujo de trabajo funcional no está disponible para C# en este momento.