Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Na orquestração sequencial, os agentes são organizados em um pipeline. Cada agente processa a tarefa por sua vez, passando sua saída para o próximo agente na sequência. Isso é ideal para fluxos de trabalho em que cada etapa se baseia na anterior, como revisão de documentos, pipelines de processamento de dados ou raciocínio em vários estágios.
Importante
Por defeito, cada agente na sequência consome toda a conversa do agente anterior — tanto as mensagens de entrada fornecidas ao agente anterior como as suas mensagens de resposta. Pode configurar os agentes para consumirem apenas as mensagens de resposta do agente anterior. Veja Controlo do Contexto Entre Agentes para mais detalhes.
O que você vai aprender
- Como criar um pipeline sequencial de agentes
- Como encadear agentes onde cada um se baseia na saída anterior
- Como adicionar aprovação humana integrativa em chamadas sensíveis de ferramentas tecnológicas
- Como misturar agentes com executores personalizados para tarefas especializadas
- Como acompanhar o fluxo de conversação através do pipeline
Defina seus agentes
Na orquestração sequencial, os agentes são organizados em um pipeline onde cada agente processa a tarefa por sua vez, passando a saída para o próximo agente na sequência.
Configurar o Cliente OpenAI do Azure
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient()
.GetProjectResponsesClient()
.AsIChatClient(deploymentName);
Advertência
DefaultAzureCredential é conveniente para o desenvolvimento, mas requer uma consideração cuidadosa na produção. Em produção, considere usar uma credencial específica (por exemplo, ManagedIdentityCredential) para evitar problemas de latência, sondagens não intencionais de credenciais e potenciais riscos de segurança provenientes de mecanismos de recurso.
Crie agentes especializados que trabalharão em sequência:
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
Configurar a orquestração sequencial
Crie o fluxo de trabalho usando AgentWorkflowBuilder:
// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);
Executar o fluxo de trabalho sequencial
Execute o fluxo de trabalho e processe os eventos:
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentResponseUpdateEvent e)
{
if (e.ExecutorId != lastExecutorId)
{
lastExecutorId = e.ExecutorId;
Console.WriteLine();
Console.Write($"{e.ExecutorId}: ");
}
Console.Write(e.Update.Text);
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = outputEvt.As<List<ChatMessage>>()!;
break;
}
}
// Display final result
Console.WriteLine();
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Text}");
}
Saída de amostra
French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!
Orquestração Sequencial com Humano no Loop
Orquestrações sequenciais suportam interações humanas no ciclo através da aprovação da ferramenta. Quando os agentes usam ferramentas integradas com ApprovalRequiredAIFunction, o fluxo de trabalho pausa e emite um RequestInfoEvent contendo um ToolApprovalRequestContent. Sistemas externos (como um operador humano) podem inspecionar a chamada de ferramenta, aprová-la ou rejeitá-la, e o fluxo de trabalho retoma em conformidade.
Sugestão
Para mais detalhes sobre o modelo de pedido e resposta, veja Humano-no-Loop.
Defina Agentes com Ferramentas que Requerem Aprovação
Criar agentes onde as ferramentas sensíveis são protegidas com ApprovalRequiredAIFunction:
ChatClientAgent deployAgent = new(
client,
"You are a DevOps engineer. Check staging status first, then deploy to production.",
"DeployAgent",
"Handles deployments",
[
AIFunctionFactory.Create(CheckStagingStatus),
new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
]);
ChatClientAgent verifyAgent = new(
client,
"You are a QA engineer. Verify that the deployment was successful and summarize the results.",
"VerifyAgent",
"Verifies deployments");
Compilar e Executar com Gestão de Aprovações
Constrói o fluxo de trabalho sequencial normalmente. O fluxo de aprovação é gerido através do fluxo de eventos:
var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is RequestInfoEvent e &&
e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
{
await run.SendResponseAsync(
e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
}
}
Observação
AgentWorkflowBuilder.BuildSequential() suporta aprovação de ferramentas logo de início — não é necessária configuração adicional. Quando um agente chama uma ferramenta envolvida com ApprovalRequiredAIFunction, o fluxo de trabalho pausa automaticamente e emite um RequestInfoEvent.
Sugestão
Para um exemplo completo e executável deste fluxo de aprovação, veja a GroupChatToolApproval amostra. O mesmo RequestInfoEvent padrão de condução aplica-se a outras orquestrações.
Conceitos-chave
- Processamento sequencial: Cada agente processa a saída do agente anterior em ordem
- AgentWorkflowBuilder.BuildSequential(): cria um fluxo de trabalho de pipeline a partir de uma coleção de agentes
- ChatClientAgent: representa um agente apoiado por um cliente de chat com instruções específicas
-
InProcessExecution.RunStreamingAsync(): Executa o fluxo de trabalho e retorna um
StreamingRunpara streaming de eventos em tempo real -
Manipulação de Eventos: Monitore o progresso através de
AgentResponseUpdateEvente a conclusão através deWorkflowOutputEvent -
Aprovação de Ferramentas: Envolver ferramentas sensíveis com
ApprovalRequiredAIFunctionpara exigir aprovação humana antes da execução -
RequestInfoEvent: Emitido quando uma ferramenta necessita de aprovação; contém
ToolApprovalRequestContentcom os detalhes da chamada de ferramenta
Na orquestração sequencial, cada agente processa a tarefa por sua vez, com a saída fluindo de um para o outro. Comece por definir agentes para um processo em duas etapas:
import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
writer = chat_client.as_agent(
instructions=(
"You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
),
name="writer",
)
reviewer = chat_client.as_agent(
instructions=(
"You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
),
name="reviewer",
)
Configurar a orquestração sequencial
A SequentialBuilder classe cria um pipeline onde os agentes processam tarefas em ordem. Cada agente vê o histórico de conversas completo e adiciona sua resposta:
from agent_framework.orchestrations import SequentialBuilder
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
Executar o fluxo de trabalho sequencial
Execute o fluxo de trabalho e recolha o resultado final. A saída do terminal é um AgentResponse contendo as mensagens de resposta do último agente:
from agent_framework import AgentResponse
# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()
if outputs:
print("===== Final Response =====")
final: AgentResponse = outputs[0]
for msg in final.messages:
name = msg.author_name or "assistant"
print(f"[{name}]\n{msg.text}")
Saída de amostra
===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!
Avançado: Misturando agentes com executores personalizados
A orquestração sequencial suporta agentes de mistura com executores personalizados para processamento especializado. Isso é útil quando você precisa de uma lógica personalizada que não requer um LLM:
Definir um executor personalizado
Observação
Quando um executor personalizado segue um agente na sequência, o seu handler recebe um AgentExecutorResponse (porque os agentes são internamente encapsulados por AgentExecutor). Use agent_response.full_conversation para aceder ao histórico completo das conversas. Um executor personalizado usado como último participante (terminador) deve chamar ctx.yield_output(AgentResponse(...)) para que a sua saída se torne a saída terminal do fluxo de trabalho.
from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never
class Summarizer(Executor):
"""Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""
@handler
async def summarize(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, AgentResponse]
) -> None:
if not agent_response.full_conversation:
await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
return
users = sum(1 for m in agent_response.full_conversation if m.role == "user")
assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
await ctx.yield_output(AgentResponse(messages=[summary]))
Crie um fluxo de trabalho sequencial misto
# Create a content agent
content = chat_client.as_agent(
instructions="Produce a concise paragraph answering the user's request.",
name="content",
)
# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()
Saída de exemplo com executor personalizado
===== Final Summary =====
Summary -> users:1 assistants:1
Controlo do Contexto Entre Agentes
Por defeito, cada agente num SequentialBuilder fluxo de trabalho consome toda a conversa do agente anterior (mensagens de entrada + resposta). A definição chain_only_agent_responses=True configura todos os agentes da sequência para consumirem apenas as mensagens de resposta do agente anterior:
workflow = SequentialBuilder(
participants=[writer, translator, reviewer],
chain_only_agent_responses=True,
).build()
Isto é útil para pipelines de tradução, refinamento progressivo e outros cenários onde cada agente deve focar-se exclusivamente em transformar a saída do agente anterior sem ser influenciado por turnos de conversa anteriores.
Para um exemplo completo, veja sequential_chain_only_agent_responses.py no repositório Agent Framework.
Sugestão
Para um controlo mais detalhado sobre o fluxo de contexto — incluindo funções de filtro personalizadas — veja Modos de Contexto na referência Agent Executor.
Saídas Intermédias
Por predefinição, apenas a saída do último participante surge como um evento de fluxo de trabalho output. Defina intermediate_outputs=True para mostrar a saída de cada participante, além da saída final:
workflow = SequentialBuilder(
participants=[writer, reviewer, editor],
intermediate_outputs=True,
).build()
Pode gerir estes eventos em tempo real em modo de streaming:
from agent_framework import AgentResponseUpdate
# Track the last author to format streaming output.
last_author: str | None = None
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
update = event.data
author = update.author_name
if author != last_author:
if last_author is not None:
print() # Newline between different authors
print(f"{author}: {update.text}", end="", flush=True)
last_author = author
else:
print(update.text, end="", flush=True)
Orquestração Sequencial com Humano no Loop
As orquestrações sequenciais suportam interações entre humanos no ciclo de duas formas: aprovação de ferramentas para controlar chamadas sensíveis e pedido de informação para pausar após cada resposta do agente para recolher feedback.
Sugestão
Para mais detalhes sobre o modelo de pedido e resposta, veja Humano-no-Loop.
Aprovação de Ferramentas em Fluxos de Trabalho Sequenciais
Use @tool(approval_mode="always_require") para marcar ferramentas que necessitem de aprovação humana antes da execução. O fluxo de trabalho pausa e emite um request_info evento quando o agente tenta chamar a ferramenta.
@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
return f"Query executed successfully: {query}"
database_agent = Agent(
client=chat_client,
name="DatabaseAgent",
instructions="You are a database assistant.",
tools=[execute_database_query],
)
workflow = SequentialBuilder(participants=[database_agent]).build()
Processar o fluxo de eventos e tratar dos pedidos de aprovação:
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info" and event.data.type == "function_approval_request":
responses[event.request_id] = event.data.to_function_approval_response(approved=True)
return responses if responses else None
stream = workflow.run("Check the schema and update all pending orders", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Sugestão
Para um exemplo completo executável, veja sequential_builder_tool_approval.py. A aprovação de ferramentas funciona com SequentialBuilder sem qualquer configuração adicional do construtor.
Solicitar Informação para Feedback do Agente
Use .with_request_info() para pausar após a resposta de agentes específicos, permitindo a entrada externa (como revisão humana) antes do início do próximo agente:
drafter = Agent(
client=chat_client,
name="drafter",
instructions="You are a document drafter. Create a brief draft on the given topic.",
)
editor = Agent(
client=chat_client,
name="editor",
instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)
finalizer = Agent(
client=chat_client,
name="finalizer",
instructions="You are a finalizer. Create a polished final version.",
)
# Enable request info for the editor agent only
workflow = (
SequentialBuilder(participants=[drafter, editor, finalizer])
.with_request_info(agents=["editor"])
.build()
)
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info":
responses[event.request_id] = AgentRequestInfoResponse.approve()
return responses if responses else None
stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Sugestão
Veja os exemplos completos: aprovação sequencial de ferramentas e informações sobre pedidos sequenciais.
Conceitos-chave
- Contexto Partilhado: Por defeito, cada agente consome toda a conversa do agente anterior, incluindo mensagens de entrada e resposta
-
Controlo de Contexto: Usar
chain_only_agent_responses=Truepara configurar agentes para consumirem apenas as mensagens de resposta do agente anterior -
Saída AgentResponse: A saída terminal do fluxo de trabalho contém
AgentResponsea resposta do último agente (não a conversa completa) -
Order Matters: Os agentes executam estritamente na ordem especificada na
participantslista - Participantes flexíveis: Você pode misturar agentes e executores personalizados em qualquer ordem
-
Contrato Personalizado de Terminadores: Um executor personalizado usado como último participante deve chamar
ctx.yield_output(AgentResponse(...))para produzir a saída do terminal -
Saídas Intermédias: Defina
intermediate_outputs=Truepara mostrar a saída de cada participante como um evento de workflowoutput, não apenas a do último participante -
Aprovação de Ferramentas: Utilização
@tool(approval_mode="always_require")para operações sensíveis que necessitam de revisão humana -
Solicitar Informação: Use
.with_request_info(agents=[...])para fazer pausas após agentes específicos para obter feedback externo