Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Aviso
A API de fluxo de trabalho funcional é experimental e está sujeita a alterações ou remoção em versões futuras sem aviso prévio.
A API de fluxo de trabalho funcional permite que você escreva fluxos de trabalho como funções assíncronas Python simples. Em vez de definir classes executoras, bordas de fiação, e usando WorkflowBuilder, você decora uma função async com @workflow e usa o fluxo de controle de Python nativo — if/else, loops for, asyncio.gather — para expressar sua lógica.
Para obter uma comparação lado a lado com a API do grafo, consulte APIs de fluxo de trabalho na visão geral dos fluxos de trabalho.
@workflow Decorador
Aplique @workflow a uma função de async para convertê-la em um objeto de FunctionalWorkflow.
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
O @workflow decorador dá suporte a um formulário parametrizado com argumentos opcionais:
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
Parâmetros @workflow
| Parâmetro | Tipo | Description |
|---|---|---|
name |
str | None |
Nome de exibição para o fluxo de trabalho. O padrão é __name__ da função. |
description |
str | None |
Descrição opcional legível por humanos. |
checkpoint_storage |
CheckpointStorage | None |
Armazenamento padrão para armazenar os resultados das etapas entre execuções. Pode ser substituído por chamada individual em run(). |
Assinatura de função de fluxo de trabalho
O primeiro parâmetro da função de fluxo de trabalho recebe a entrada passada para .run(). Adicione um ctx: WorkflowRunContext parâmetro somente quando você precisar de HITL, estado de chave/valor ou eventos personalizados . Caso contrário, ele será opcional:
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext é detectado por anotação de tipo primeiro e, em seguida, pelo nome do parâmetro ctx, portanto, ambos ctx: WorkflowRunContext e um parâmetro simples ctx funcionam.
Executando um fluxo de trabalho
Chamar .run() no objeto FunctionalWorkflow retornado por @workflow:
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
Parâmetros run()
| Parâmetro | Tipo | Description |
|---|---|---|
message |
Any | None |
Entrada passada para a função workflow como primeiro argumento. |
stream |
bool |
Se True, retornará um ResponseStream que gera WorkflowEvent objetos. Usa False como padrão. |
responses |
dict[str, Any] | None |
Respostas HITL indexadas por request_id. Usado para retomar um fluxo de trabalho suspenso. |
checkpoint_id |
str | None |
Ponto de verificação do qual restaurar. Requer a definição de checkpoint_storage. |
checkpoint_storage |
CheckpointStorage | None |
Substitui a configuração de armazenamento padrão no decorador nesta execução. |
include_status_events |
bool |
Inclua eventos de alteração de status no resultado de não streaming. |
Exatamente apenas um de message, responses, ou checkpoint_id deve ser especificado por chamada.
WorkflowRunResult
run() (não-transmissão) retorna um WorkflowRunResult. Principais métodos:
| Método/propriedade | Returns | Description |
|---|---|---|
.text |
str |
Primeira saída como uma cadeia de caracteres. Cadeia de caracteres vazia se não houver saída de string. |
.get_outputs() |
list[Any] |
Todas as saídas emitidas pelo fluxo de trabalho. |
.get_final_state() |
WorkflowRunState |
Estado de execução final (IDLE, , IDLE_WITH_PENDING_REQUESTS, FAILED...). |
.get_request_info_events() |
list[WorkflowEvent] |
Solicitações HITL pendentes quando o estado é IDLE_WITH_PENDING_REQUESTS. |
Streaming
Passe stream=True para receber eventos conforme eles são produzidos:
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
Veja python/samples/03-workflows/functional/basic_streaming_pipeline.py para um exemplo completo.
@step Decorador
@step é um decorador opt-in que adiciona cache de resultados, emissão de eventos e ponto de verificação por etapa a funções assíncronas individuais:
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
O que @step faz dentro de um fluxo de trabalho
-
Resultados de caches — o resultado é armazenado por
(step_name, call_index). Na retomada HITL ou restauração do ponto de verificação, uma etapa concluída retorna instantaneamente seu resultado salvo em vez de reexecutar. -
Emite eventos –
executor_invoked/executor_completed/executor_failedsão emitidos para observabilidade. Em vez disso, em um cache atingido,executor_bypassedé emitido. -
Salva pontos de verificação – se o fluxo de trabalho tiver
checkpoint_storage, um ponto de verificação é salvo após a conclusão de cada etapa. -
Injeta
WorkflowRunContext— se a função de etapa declarar um parâmetroctx: WorkflowRunContext, o contexto ativo será automaticamente injetado.
Fora de um fluxo de trabalho em execução, @step é transparente — a função se comporta de forma idêntica à sua versão não decorada, tornando-a totalmente testável em isolamento.
Quando usar @step
Use @step em funções que são custosas para executar novamente: chamadas de agente, solicitações de API externas ou qualquer operação em que a re-execução na retomada seja custosa ou tenha efeitos colaterais. As funções simples (sem @step) ainda funcionam no interior @workflow; elas simplesmente são executadas novamente quando o fluxo de trabalho é retomado.
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step também aceita um name parâmetro:
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
Veja python/samples/03-workflows/functional/steps_and_checkpointing.py para um exemplo completo.
WorkflowRunContext
WorkflowRunContext (alias curto: RunContext) é o contexto de execução injetado em funções de fluxo de trabalho e etapas. Você só precisa dele quando usa HITL, estado de chave/valor ou eventos personalizados.
Importe-o de agent_framework:
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — Humano-no-laço
ctx.request_info() suspende o fluxo de trabalho para aguardar a entrada externa:
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
Parâmetros:
| Parâmetro | Tipo | Description |
|---|---|---|
request_data |
Any |
Conteúdo que descreve qual entrada é necessária (dicionário, modelo Pydantic, cadeia de caracteres, ...). |
response_type |
type |
Tipo de resposta esperado do Python. |
request_id |
str | None |
Identificador estável para essa solicitação. Uma UUID aleatória é gerada se omitida. |
Semântica de reprodução: Na primeira execução, request_info() gera um sinal interno (nunca visível ao seu código) que suspende o fluxo de trabalho. O chamador recebe um WorkflowRunResult com get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Retome chamando .run(responses={request_id: value}) – o fluxo de trabalho é executado novamente na parte superior e request_info() retorna o valor fornecido imediatamente.
@step-Funções decoradas que foram executadas antes da suspensão retornam seus resultados armazenados em cache ao retomar, em vez de serem executadas novamente.
Manipulando a resposta:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
Veja python/samples/03-workflows/functional/hitl_review.py para um exemplo completo.
ctx.request_info() também é compatível dentro de funções @step.
ctx.add_event() — Eventos personalizados
Use ctx.add_event() para emitir eventos específicos do aplicativo junto com eventos de ciclo de vida da estrutura. Para obter detalhes e exemplos completos, consulte Emitindo eventos personalizados.
ctx.get_state()
/
ctx.set_state() — Estado de chave/valor
Use ctx.get_state() e ctx.set_state() para armazenar valores que persistam entre interrupções de HITL e que são incluídos em checkpoints. Para obter detalhes completos, consulte o estado do fluxo de trabalho.
Os valores de estado devem ser serializáveis em JSON quando o armazenamento de ponto de verificação estiver configurado.
ctx.is_streaming()
Retorna True quando a execução atual foi iniciada com stream=True. Útil dentro de funções de etapa que desejam ajustar seu comportamento com base no modo de streaming.
get_run_context()
Recupera o elemento ativo WorkflowRunContext de qualquer lugar dentro de um fluxo de trabalho em execução — útil em funções auxiliares que não declaram um parâmetro ctx.
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
Retorna None quando chamado fora de um fluxo de trabalho em execução.
Paralelismo com asyncio.gather
Utilize a concorrência padrão do Python para fan-out/fan-in — não são necessários primitivos de estrutura.
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather também funciona quando as funções são decoradas com @step.
Veja python/samples/03-workflows/functional/parallel_pipeline.py para um exemplo completo.
Chamar agentes dentro de fluxos de trabalho
As chamadas de agente funcionam como chamadas de função simples dentro de @workflow:
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
Adicione @step às funções de chamada de agente quando desejar que seus resultados sejam armazenados em cache durante retomadas HITL ou restaurações de ponto de verificação.
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
Veja python/samples/03-workflows/functional/agent_integration.py para um exemplo completo.
.as_agent() — Usando um fluxo de trabalho como um agente
Envolver um FunctionalWorkflow como objeto compatível com um agente com .as_agent():
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent() retorna um FunctionalWorkflowAgent que expõe a mesma run() interface que outros objetos de agente, tornando os fluxos de trabalho funcionais composíveis com qualquer sistema que aceite agentes.
| Parâmetro | Tipo | Description |
|---|---|---|
name |
str | None |
Nome de exibição do agente. O padrão é o nome do fluxo de trabalho. |
Veja python/samples/03-workflows/functional/agent_integration.py para um exemplo.
Samples
Exemplos executáveis estão nas seguintes pastas de exemplo:
-
python/samples/01-get-started/— exemplos introdutórios@workflow -
python/samples/03-workflows/functional/— exemplos completos de fluxo de trabalho funcional com todos os recursos
Próximas Etapas
Tópicos relacionados:
- Executores – unidades de processamento na API baseada em grafo
- Human-in-the-loop – HITL em fluxos de trabalho baseados em grafos
- Pontos de verificação – armazenamento de ponto de verificação e retomada
- Eventos – tipos de evento de fluxo de trabalho
- Usando fluxos de trabalho como agentes
A API de fluxo de trabalho funcional não está disponível para C# no momento.