Orquestrações de fluxos de trabalho do Microsoft Agent Framework - Sequencial

Na orquestração sequencial, os agentes são organizados em um pipeline. Cada agente processa a tarefa por sua vez, passando sua saída para o próximo agente na sequência. Isso é ideal para fluxos de trabalho em que cada etapa se baseia na anterior, como revisão de documentos, pipelines de processamento de dados ou raciocínio em vários estágios.

Orquestração sequencial

Importante

Por defeito, cada agente na sequência consome toda a conversa do agente anterior — tanto as mensagens de entrada fornecidas ao agente anterior como as suas mensagens de resposta. Pode configurar os agentes para consumirem apenas as mensagens de resposta do agente anterior. Veja Controlo do Contexto Entre Agentes para mais detalhes.

O que você vai aprender

  • Como criar um pipeline sequencial de agentes
  • Como encadear agentes onde cada um se baseia na saída anterior
  • Como adicionar aprovação humana integrativa em chamadas sensíveis de ferramentas tecnológicas
  • Como misturar agentes com executores personalizados para tarefas especializadas
  • Como acompanhar o fluxo de conversação através do pipeline

Defina seus agentes

Na orquestração sequencial, os agentes são organizados em um pipeline onde cada agente processa a tarefa por sua vez, passando a saída para o próximo agente na sequência.

Configurar o Cliente OpenAI do Azure

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Advertência

DefaultAzureCredential é conveniente para o desenvolvimento, mas requer uma consideração cuidadosa na produção. Em produção, considere usar uma credencial específica (por exemplo, ManagedIdentityCredential) para evitar problemas de latência, sondagens não intencionais de credenciais e potenciais riscos de segurança provenientes de mecanismos de recurso.

Crie agentes especializados que trabalharão em sequência:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Configurar a orquestração sequencial

Crie o fluxo de trabalho usando AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Executar o fluxo de trabalho sequencial

Execute o fluxo de trabalho e processe os eventos:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Saída de amostra

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Orquestração Sequencial com Humano no Loop

Orquestrações sequenciais suportam interações humanas no ciclo através da aprovação da ferramenta. Quando os agentes usam ferramentas integradas com ApprovalRequiredAIFunction, o fluxo de trabalho pausa e emite um RequestInfoEvent contendo um ToolApprovalRequestContent. Sistemas externos (como um operador humano) podem inspecionar a chamada de ferramenta, aprová-la ou rejeitá-la, e o fluxo de trabalho retoma em conformidade.

Orquestração Sequencial com Humano no Loop

Sugestão

Para mais detalhes sobre o modelo de pedido e resposta, veja Humano-no-Loop.

Defina Agentes com Ferramentas que Requerem Aprovação

Criar agentes onde as ferramentas sensíveis são protegidas com ApprovalRequiredAIFunction:

ChatClientAgent deployAgent = new(
    client,
    "You are a DevOps engineer. Check staging status first, then deploy to production.",
    "DeployAgent",
    "Handles deployments",
    [
        AIFunctionFactory.Create(CheckStagingStatus),
        new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
    ]);

ChatClientAgent verifyAgent = new(
    client,
    "You are a QA engineer. Verify that the deployment was successful and summarize the results.",
    "VerifyAgent",
    "Verifies deployments");

Compilar e Executar com Gestão de Aprovações

Constrói o fluxo de trabalho sequencial normalmente. O fluxo de aprovação é gerido através do fluxo de eventos:

var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent e &&
        e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
    {
        await run.SendResponseAsync(
            e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
    }
}

Observação

AgentWorkflowBuilder.BuildSequential() suporta aprovação de ferramentas logo de início — não é necessária configuração adicional. Quando um agente chama uma ferramenta envolvida com ApprovalRequiredAIFunction, o fluxo de trabalho pausa automaticamente e emite um RequestInfoEvent.

Sugestão

Para um exemplo completo e executável deste fluxo de aprovação, veja a GroupChatToolApproval amostra. O mesmo RequestInfoEvent padrão de condução aplica-se a outras orquestrações.

Conceitos-chave

  • Processamento sequencial: Cada agente processa a saída do agente anterior em ordem
  • AgentWorkflowBuilder.BuildSequential(): cria um fluxo de trabalho de pipeline a partir de uma coleção de agentes
  • ChatClientAgent: representa um agente apoiado por um cliente de chat com instruções específicas
  • InProcessExecution.RunStreamingAsync(): Executa o fluxo de trabalho e retorna um StreamingRun para streaming de eventos em tempo real
  • Manipulação de Eventos: Monitore o progresso através de AgentResponseUpdateEvent e a conclusão através de WorkflowOutputEvent
  • Aprovação de Ferramentas: Envolver ferramentas sensíveis com ApprovalRequiredAIFunction para exigir aprovação humana antes da execução
  • RequestInfoEvent: Emitido quando uma ferramenta necessita de aprovação; contém ToolApprovalRequestContent com os detalhes da chamada de ferramenta

Na orquestração sequencial, cada agente processa a tarefa por sua vez, com a saída fluindo de um para o outro. Comece por definir agentes para um processo em duas etapas:

import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Configurar a orquestração sequencial

A SequentialBuilder classe cria um pipeline onde os agentes processam tarefas em ordem. Cada agente vê o histórico de conversas completo e adiciona sua resposta:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Executar o fluxo de trabalho sequencial

Execute o fluxo de trabalho e recolha o resultado final. A saída do terminal é um AgentResponse contendo as mensagens de resposta do último agente:

from agent_framework import AgentResponse

# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()

if outputs:
    print("===== Final Response =====")
    final: AgentResponse = outputs[0]
    for msg in final.messages:
        name = msg.author_name or "assistant"
        print(f"[{name}]\n{msg.text}")

Saída de amostra

===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Avançado: Misturando agentes com executores personalizados

A orquestração sequencial suporta agentes de mistura com executores personalizados para processamento especializado. Isso é útil quando você precisa de uma lógica personalizada que não requer um LLM:

Definir um executor personalizado

Observação

Quando um executor personalizado segue um agente na sequência, o seu handler recebe um AgentExecutorResponse (porque os agentes são internamente encapsulados por AgentExecutor). Use agent_response.full_conversation para aceder ao histórico completo das conversas. Um executor personalizado usado como último participante (terminador) deve chamar ctx.yield_output(AgentResponse(...)) para que a sua saída se torne a saída terminal do fluxo de trabalho.

from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never

class Summarizer(Executor):
    """Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[Never, AgentResponse]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.yield_output(AgentResponse(messages=[summary]))

Crie um fluxo de trabalho sequencial misto

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Saída de exemplo com executor personalizado

===== Final Summary =====
Summary -> users:1 assistants:1

Controlo do Contexto Entre Agentes

Por defeito, cada agente num SequentialBuilder fluxo de trabalho consome toda a conversa do agente anterior (mensagens de entrada + resposta). A definição chain_only_agent_responses=True configura todos os agentes da sequência para consumirem apenas as mensagens de resposta do agente anterior:

workflow = SequentialBuilder(
    participants=[writer, translator, reviewer],
    chain_only_agent_responses=True,
).build()

Isto é útil para pipelines de tradução, refinamento progressivo e outros cenários onde cada agente deve focar-se exclusivamente em transformar a saída do agente anterior sem ser influenciado por turnos de conversa anteriores.

Para um exemplo completo, veja sequential_chain_only_agent_responses.py no repositório Agent Framework.

Sugestão

Para um controlo mais detalhado sobre o fluxo de contexto — incluindo funções de filtro personalizadas — veja Modos de Contexto na referência Agent Executor.

Saídas Intermédias

Por predefinição, apenas a saída do último participante surge como um evento de fluxo de trabalho output. Defina intermediate_outputs=True para mostrar a saída de cada participante, além da saída final:

workflow = SequentialBuilder(
    participants=[writer, reviewer, editor],
    intermediate_outputs=True,
).build()

Pode gerir estes eventos em tempo real em modo de streaming:

from agent_framework import AgentResponseUpdate

# Track the last author to format streaming output.
last_author: str | None = None

async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        update = event.data
        author = update.author_name
        if author != last_author:
            if last_author is not None:
                print()  # Newline between different authors
            print(f"{author}: {update.text}", end="", flush=True)
            last_author = author
        else:
            print(update.text, end="", flush=True)

Orquestração Sequencial com Humano no Loop

As orquestrações sequenciais suportam interações entre humanos no ciclo de duas formas: aprovação de ferramentas para controlar chamadas sensíveis e pedido de informação para pausar após cada resposta do agente para recolher feedback.

Orquestração Sequencial com Humano no Loop

Sugestão

Para mais detalhes sobre o modelo de pedido e resposta, veja Humano-no-Loop.

Aprovação de Ferramentas em Fluxos de Trabalho Sequenciais

Use @tool(approval_mode="always_require") para marcar ferramentas que necessitem de aprovação humana antes da execução. O fluxo de trabalho pausa e emite um request_info evento quando o agente tenta chamar a ferramenta.

@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
    return f"Query executed successfully: {query}"


database_agent = Agent(
    client=chat_client,
    name="DatabaseAgent",
    instructions="You are a database assistant.",
    tools=[execute_database_query],
)

workflow = SequentialBuilder(participants=[database_agent]).build()

Processar o fluxo de eventos e tratar dos pedidos de aprovação:

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info" and event.data.type == "function_approval_request":
            responses[event.request_id] = event.data.to_function_approval_response(approved=True)
    return responses if responses else None

stream = workflow.run("Check the schema and update all pending orders", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Sugestão

Para um exemplo completo executável, veja sequential_builder_tool_approval.py. A aprovação de ferramentas funciona com SequentialBuilder sem qualquer configuração adicional do construtor.

Solicitar Informação para Feedback do Agente

Use .with_request_info() para pausar após a resposta de agentes específicos, permitindo a entrada externa (como revisão humana) antes do início do próximo agente:

drafter = Agent(
    client=chat_client,
    name="drafter",
    instructions="You are a document drafter. Create a brief draft on the given topic.",
)

editor = Agent(
    client=chat_client,
    name="editor",
    instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)

finalizer = Agent(
    client=chat_client,
    name="finalizer",
    instructions="You are a finalizer. Create a polished final version.",
)

# Enable request info for the editor agent only
workflow = (
    SequentialBuilder(participants=[drafter, editor, finalizer])
    .with_request_info(agents=["editor"])
    .build()
)

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info":
            responses[event.request_id] = AgentRequestInfoResponse.approve()
    return responses if responses else None

stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Conceitos-chave

  • Contexto Partilhado: Por defeito, cada agente consome toda a conversa do agente anterior, incluindo mensagens de entrada e resposta
  • Controlo de Contexto: Usar chain_only_agent_responses=True para configurar agentes para consumirem apenas as mensagens de resposta do agente anterior
  • Saída AgentResponse: A saída terminal do fluxo de trabalho contém AgentResponse a resposta do último agente (não a conversa completa)
  • Order Matters: Os agentes executam estritamente na ordem especificada na participants lista
  • Participantes flexíveis: Você pode misturar agentes e executores personalizados em qualquer ordem
  • Contrato Personalizado de Terminadores: Um executor personalizado usado como último participante deve chamar ctx.yield_output(AgentResponse(...)) para produzir a saída do terminal
  • Saídas Intermédias: Defina intermediate_outputs=True para mostrar a saída de cada participante como um evento de workflow output , não apenas a do último participante
  • Aprovação de Ferramentas: Utilização @tool(approval_mode="always_require") para operações sensíveis que necessitam de revisão humana
  • Solicitar Informação: Use .with_request_info(agents=[...]) para fazer pausas após agentes específicos para obter feedback externo

Próximos passos