Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Warning
La API de flujo de trabajo funcional es experimental y está sujeta a cambios o eliminación en versiones futuras sin previo aviso.
La API de flujo de trabajo funcional permite escribir flujos de trabajo como funciones asincrónicas de Python. En lugar de definir clases de ejecutor, conectar bordes y usar WorkflowBuilder, decora una función async con @workflow y utiliza el flujo de control nativo de Python — if/else, bucles for, asyncio.gather — para expresar la lógica.
Para obtener una comparación lado a lado con la API de gráfico, consulte API de flujos de trabajo en la descripción general de los flujos de trabajo.
@workflow decorador
Aplicar @workflow a una async función para convertirlo en un FunctionalWorkflow objeto :
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
El @workflow decorador admite un formulario con parámetros con argumentos opcionales:
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
Parámetros @workflow
| Parámetro | Tipo | Description |
|---|---|---|
name |
str | None |
Nombre del flujo de trabajo para mostrar. El valor por defecto de la función es __name__. |
description |
str | None |
Descripción opcional comprensible para humanos. |
checkpoint_storage |
CheckpointStorage | None |
Almacenamiento predeterminado para conservar los resultados de los pasos entre ejecuciones. Se puede invalidar por cada llamada en run(). |
Firma de función de flujo de trabajo
El primer parámetro de la función de flujo de trabajo recibe la entrada pasada a .run(). Agregue un ctx: WorkflowRunContext parámetro solo cuando necesite HITL, estado de clave/valor o eventos personalizados; de lo contrario, es opcional:
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext se detecta primero por anotación de tipo y, a continuación, por el nombre ctx del parámetro, por lo que tanto ctx: WorkflowRunContext como un parámetro sin ctx funcionan.
Ejecutar un flujo de trabajo
Llame al .run() en el objeto FunctionalWorkflow devuelto por @workflow:
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
Parámetros run()
| Parámetro | Tipo | Description |
|---|---|---|
message |
Any | None |
Entrada pasada a la función de flujo de trabajo como primer argumento. |
stream |
bool |
Si True, devuelve un ResponseStream que produce WorkflowEvent objetos. Tiene como valor predeterminado False. |
responses |
dict[str, Any] | None |
Respuestas HITL basadas en claves request_id. Se usa para reanudar un flujo de trabajo suspendido. |
checkpoint_id |
str | None |
Punto de control desde el que se va a restaurar. Requiere establecer checkpoint_storage. |
checkpoint_storage |
CheckpointStorage | None |
Invalida la configuración de almacenamiento predeterminada establecida en el decorador para esta ejecución. |
include_status_events |
bool |
Incluya eventos de cambio de estado en el resultado que no sea de streaming. |
Se debe proporcionar exactamente uno de message, responseso checkpoint_id por llamada.
WorkflowRunResult
run() (sin streaming) devuelve un WorkflowRunResult. Métodos clave:
| Método/propiedad | Returns | Description |
|---|---|---|
.text |
str |
Primera salida como una cadena. Cadena vacía si no hay cadena de salida. |
.get_outputs() |
list[Any] |
Todas las salidas emitidas por el flujo de trabajo. |
.get_final_state() |
WorkflowRunState |
Estado de ejecución final (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...). |
.get_request_info_events() |
list[WorkflowEvent] |
Solicitudes HITL pendientes cuando el estado es IDLE_WITH_PENDING_REQUESTS. |
Transmisión en línea
Pase stream=True para recibir eventos a medida que se produzcan:
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
Consulte python/samples/03-workflows/functional/basic_streaming_pipeline.py para obtener un ejemplo completo.
@step decorador
@step es un decorador opcional que agrega almacenamiento en caché de resultados, emisión de eventos y puntos de comprobación por paso a funciones asincrónicas individuales:
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
Qué @step hace dentro de un flujo de trabajo
-
Almacena en caché los resultados; el resultado se almacena mediante
(step_name, call_index). En la restauración del punto de control o la reanudación de HITL, un paso completado devuelve su resultado guardado de inmediato en lugar de ejecutarse nuevamente. -
Emite eventos:
executor_invoked/executor_completed/executor_failedse emiten para la observabilidad. En un acierto de caché,executor_bypassedse emite en su lugar. -
Guarda los puntos de control : si el flujo de trabajo tiene
checkpoint_storage, se guarda un punto de control después de que se complete cada paso. -
Inyecta
WorkflowRunContext— si la función de paso declara unctx: WorkflowRunContextparámetro, el contexto activo se inyecta automáticamente.
Fuera de un flujo de trabajo en ejecución, @step es transparente, la función se comporta de forma idéntica a su versión sin decorar, lo que hace que sea completamente comprobable de forma aislada.
Cuándo debe usarse @step
Use @step en funciones que son costosas para volver a ejecutar: llamadas de agente, solicitudes de API externas o cualquier operación donde la reejecución al reanudar podría ser costosa o tener efectos secundarios. Las funciones sin formato (sin @step) siguen funcionando dentro de @workflow; simplemente se vuelven a ejecutar cuando se reanuda el flujo de trabajo.
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step también acepta un name parámetro:
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
Consulte python/samples/03-workflows/functional/steps_and_checkpointing.py para obtener un ejemplo completo.
WorkflowRunContext
WorkflowRunContext (alias corto: RunContext) es el contexto de ejecución inyectado en los workflows y funciones de paso. Solo lo necesita cuando se usa HITL, estado de clave-valor o eventos personalizados.
Impórtelo desde agent_framework:
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — Humano en el bucle
ctx.request_info() suspende el flujo de trabajo para esperar la entrada externa:
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
Parámetros:
| Parámetro | Tipo | Description |
|---|---|---|
request_data |
Any |
Carga que describe qué entrada se necesita (dict, modelo Pydantic, cadena, ...). |
response_type |
type |
Se esperaba un tipo de respuesta en Python. |
request_id |
str | None |
Identificador estable para esta solicitud. Se genera un UUID aleatorio si se omite. |
Semántica de reproducción: En la primera ejecución, request_info() genera una señal interna (nunca visible para el código) que suspende el flujo de trabajo. El llamante recibe un WorkflowRunResult con get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Reanude mediante una llamada a .run(responses={request_id: value}): el flujo de trabajo se vuelve a ejecutar desde el principio, y request_info() devuelve el valor proporcionado inmediatamente.
@stepLas funciones decoradas que se ejecutaron antes de la suspensión devuelven sus resultados almacenados en caché al reanudar, en lugar de volver a ejecutarse.
Control de la respuesta:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
Consulte python/samples/03-workflows/functional/hitl_review.py para obtener un ejemplo completo.
ctx.request_info() también se admite dentro de @step funciones.
ctx.add_event() — Eventos personalizados
Use ctx.add_event() para emitir eventos específicos de la aplicación junto con los eventos del ciclo de vida del marco. Para obtener detalles completos y ejemplos, consulte Emisión de eventos personalizados.
ctx.get_state()
/
ctx.set_state() — Estado de clave/valor
Use ctx.get_state() y ctx.set_state() para almacenar valores que persistan en las interrupciones de HITL y se incluyan en los puntos de control. Para más información, consulte Estado del flujo de trabajo.
Los valores de estado deben ser serializables con JSON cuando se configura el almacenamiento de puntos de control.
ctx.is_streaming()
Devuelve True cuando se inició la ejecución actual con stream=True. Resulta útil dentro de las funciones de paso que quieren ajustar su comportamiento en función del modo de streaming.
get_run_context()
Recupera el activo WorkflowRunContext desde cualquier lugar dentro de un flujo de trabajo en ejecución, útil en las funciones auxiliares que no declaran un ctx parámetro:
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
Devuelve None cuando se llama fuera de un flujo de trabajo en ejecución.
Paralelismo con asyncio.gather
Use la concurrencia estándar de Python para el fan-out/fan-in, sin necesidad de primitivos de framework.
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather también funciona cuando las funciones están decoradas con @step.
Consulte python/samples/03-workflows/functional/parallel_pipeline.py para obtener un ejemplo completo.
Llamar a agentes dentro de flujos de trabajo
Las llamadas de agente funcionan como llamadas de función simples dentro de @workflow:
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
Agregue @step a las funciones de llamadas de agente cuando desee almacenar en caché sus resultados en la reanudación de HITL o restauración de puntos de comprobación.
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
Consulte python/samples/03-workflows/functional/agent_integration.py para obtener un ejemplo completo.
.as_agent() : uso de un flujo de trabajo como agente
Encapsula un FunctionalWorkflow como un objeto compatible con agentes usando .as_agent():
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent() devuelve un FunctionalWorkflowAgent objeto que expone la misma run() interfaz que otros objetos de agente, lo que hace que los flujos de trabajo funcionales se puedan componer con cualquier sistema que acepte agentes.
| Parámetro | Tipo | Description |
|---|---|---|
name |
str | None |
Muestra el nombre del agente. El valor predeterminado es el nombre del flujo de trabajo. |
Consulte python/samples/03-workflows/functional/agent_integration.py para obtener un ejemplo.
Samples
Los ejemplos ejecutables se encuentran en las siguientes carpetas de ejemplo:
-
python/samples/01-get-started/: ejemplos introductorios@workflow -
python/samples/03-workflows/functional/: ejemplos de flujo de trabajo funcionales de características completas
Pasos siguientes
Temas relacionados::
- Ejecutores : unidades de procesamiento en la API basada en grafos
- Human-in-the-loop : HITL en flujos de trabajo basados en grafos
- Puntos de control: almacenamiento de puntos de control y reanudación
- Eventos : tipos de eventos de flujo de trabajo
- Uso de flujos de trabajo como agentes
La API de flujo de trabajo funcional no está disponible para C# en este momento.