Fluxos de Trabalho do Microsoft Agent Framework - Utilização de Fluxos de Trabalho como Agentes

Este documento fornece uma visão geral de como usar Fluxos de Trabalho como Agentes no Microsoft Agent Framework.

Visão geral

Por vezes, você construiu um fluxo de trabalho sofisticado com múltiplos agentes, executores personalizados e lógica complexa, mas quer usá-lo como qualquer outro agente. É exatamente isso que os agentes de workflow permitem fazer. Ao envolver o seu fluxo de trabalho como um Agent, pode interagir com ele através da mesma API familiar que usaria para um simples agente de chat.

Principais Benefícios

  • Interface Unificada: Interagir com fluxos de trabalho complexos usando a mesma API que agentes simples
  • Compatibilidade de API: Integrar fluxos de trabalho com sistemas existentes que suportem a interface do Agente
  • Componibilidade: Utilize agentes de workflow como blocos de construção em sistemas de agentes maiores ou outros fluxos de trabalho
  • Gestão de Sessões: Utilize as sessões dos agentes para o estado da conversa e a retomada de sessão
  • Suporte a Streaming: Receba atualizações em tempo real à medida que o fluxo de trabalho é executado

Como funciona

Quando convertes um fluxo de trabalho para agente:

  1. O fluxo de trabalho é validado para garantir que o executor de início pode aceitar os tipos de entrada necessários
  2. Uma sessão é criada para gerir o estado da conversa
  3. As mensagens de entrada são encaminhadas para o executor inicial do fluxo de trabalho
  4. Os eventos do fluxo de trabalho são convertidos em atualizações de resposta do agente
  5. Pedidos de entrada externos (de RequestInfoExecutor) são apresentados como chamadas de função

Requerimentos

Para usar um fluxo de trabalho como agente, o executor inicial do fluxo de trabalho deve ser capaz de tratar IEnumerable<ChatMessage> como entrada. Isto é automaticamente satisfeito ao usar executores baseados em agentes criados com AsAIAgent.

Criar um Agente de Fluxo de Trabalho

Use o método de AsAIAgent() extensão para converter qualquer fluxo de trabalho compatível num agente:

using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

// Create agents
AIAgent researchAgent = chatClient.AsAIAgent("You are a researcher. Research and gather information on the given topic.");
AIAgent writerAgent = chatClient.AsAIAgent("You are a writer. Write clear, engaging content based on research.");
AIAgent reviewerAgent = chatClient.AsAIAgent("You are a reviewer. Review the content and provide a final polished version.");

// Build a sequential workflow
var workflow = new WorkflowBuilder(researchAgent)
    .AddEdge(researchAgent, writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAIAgent(
    id: "content-pipeline",
    name: "Content Pipeline Agent",
    description: "A multi-agent workflow that researches, writes, and reviews content"
);

Parâmetros AsAIAgent

Parâmetro Tipo Description
id string? Identificador único opcional para o agente. Gerado automaticamente se não for fornecido.
name string? Nome de exibição opcional para o agente.
description string? Descrição opcional do propósito do agente.
executionEnvironment IWorkflowExecutionEnvironment? Ambiente de execução opcional. Por defeito, InProcessExecution.OffThread ou InProcessExecution.Concurrent com base na configuração do fluxo de trabalho.
includeExceptionDetails bool Se true, inclui mensagens de exceção no conteúdo de erro. O padrão é false.
includeWorkflowOutputsInResponse bool Se true, transforma os resultados do fluxo de trabalho de saída em conteúdo nas respostas dos agentes. O padrão é false.

Utilização de Agentes de Fluxo de Trabalho

Criar uma Sessão

Cada conversa com um agente de workflow requer uma sessão para gerir o estado:

// Create a new session for the conversation
AgentSession session = await workflowAgent.CreateSessionAsync();

Execução Não-Streaming

Para casos de uso simples onde quer a resposta completa:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

AgentResponse response = await workflowAgent.RunAsync(messages, session);

foreach (ChatMessage message in response.Messages)
{
    Console.WriteLine($"{message.AuthorName}: {message.Text}");
}

Execução em Streaming

Para atualizações em tempo real à medida que o fluxo de trabalho é executado:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
    // Process streaming updates from each agent in the workflow
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

Gestão de Pedidos de Entrada Externos

Quando um fluxo de trabalho contém executores que solicitam entrada externa (usando RequestInfoExecutor), estes pedidos são apresentados como chamadas de função na resposta do agente:

await foreach (AgentResponseUpdate update in workflowAgent.RunStreamingAsync(messages, session))
{
    // Check for function call requests
    foreach (AIContent content in update.Contents)
    {
        if (content is FunctionCallContent functionCall)
        {
            // Handle the external input request
            Console.WriteLine($"Workflow requests input: {functionCall.Name}");
            Console.WriteLine($"Request data: {functionCall.Arguments}");

            // Provide the response in the next message
        }
    }
}

Serialização e Retomada da Sessão

As sessões do agente de workflow podem ser serializadas para persistência e retomadas posteriormente:

// Serialize the session state
JsonElement serializedSession = await workflowAgent.SerializeSessionAsync(session);

// Store serializedSession to your persistence layer...

// Later, resume the session
AgentSession resumedSession = await workflowAgent.DeserializeSessionAsync(serializedSession);

// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedSession))
{
    Console.Write(update.Text);
}

Requerimentos

Para usar um fluxo de trabalho como agente, o executor inicial do fluxo de trabalho deve ser capaz de gerir a introdução de mensagens. Isto é satisfeito automaticamente ao utilizar executores baseados em agentes Agent.

Criando um agente de fluxo de trabalho

Recorra as_agent() a qualquer fluxo de trabalho compatível para o converter num agente:

from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential

# Create your chat client and agents
client = FoundryChatClient(
    project_endpoint="<your-endpoint>",
    model="<your-deployment>",
    credential=AzureCliCredential(),
)

researcher = client.as_agent(
    name="Researcher",
    instructions="Research and gather information on the given topic.",
)

writer = client.as_agent(
    name="Writer",
    instructions="Write clear, engaging content based on research.",
)

# Build a sequential workflow
workflow = SequentialBuilder(participants=[researcher, writer]).build()

# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")

as_agent Parâmetros

Parâmetro Tipo Description
name str | None Nome de exibição opcional para o agente. Gerado automaticamente se não for fornecido.

Utilização de Agentes de Fluxo de Trabalho

Criar uma Sessão

Pode, opcionalmente, criar uma sessão para gerir o estado da conversa ao longo de vários turnos:

# Create a new session for the conversation
session = await workflow_agent.create_session()

Observação

As sessões são opcionais. Se não passar a session a run(), o agente trata do estado internamente. Se workflow.as_agent() for criado sem context_providers, o framework adiciona por defeito um InMemoryHistoryProvider() histórico de múltiplos turnos logo à partida. Se passar context_providers explicitamente, essa lista é usada como está.

Execução Não-Streaming

Para casos de uso simples onde quer a resposta completa:

# You can pass a plain string as input
response = await workflow_agent.run("Write an article about AI trends")

for message in response.messages:
    print(f"{message.author_name}: {message.text}")

Execução em Streaming

Para atualizações em tempo real à medida que o fluxo de trabalho é executado:

async for update in workflow_agent.run(
    "Write an article about AI trends",
    stream=True,
):
    if update.text:
        print(update.text, end="", flush=True)

Gestão de Pedidos de Entrada Externos

Quando um fluxo de trabalho contém executores que solicitam entrada externa (usando request_info), estes pedidos são apresentados como chamadas de função na resposta do agente. A chamada de função utiliza o nome WorkflowAgent.REQUEST_INFO_FUNCTION_NAME:

from agent_framework import Content, Message, WorkflowAgent

response = await workflow_agent.run("Process my request")

# Look for function calls in the response
human_review_function_call = None
for message in response.messages:
    for content in message.contents:
        if content.name == WorkflowAgent.REQUEST_INFO_FUNCTION_NAME:
            human_review_function_call = content

Fornecer Respostas a Pedidos Pendentes

Para continuar a execução do fluxo de trabalho após um pedido de entrada externo, crie um resultado de função e envie-o de volta:

if human_review_function_call:
    # Parse the request arguments
    request = WorkflowAgent.RequestInfoFunctionArgs.from_json(
        human_review_function_call.arguments
    )

    # Create a response (your custom response type)
    result_data = MyResponseType(approved=True, feedback="Looks good")

    # Create the function call result
    function_result = Content.from_function_result(
        call_id=human_review_function_call.call_id,
        result=result_data,
    )

    # Send the response back to continue the workflow
    response = await workflow_agent.run(Message("tool", [function_result]))

Exemplo completo

Aqui está um exemplo completo que demonstra um agente de workflow com saída de streaming:

import asyncio
import os

from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential


async def main():
    # Set up the chat client
    client = FoundryChatClient(
        project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
        model=os.environ["FOUNDRY_MODEL"],
        credential=AzureCliCredential(),
    )

    # Create specialized agents
    researcher = client.as_agent(
        name="Researcher",
        instructions="Research the given topic and provide key facts.",
    )

    writer = client.as_agent(
        name="Writer",
        instructions="Write engaging content based on the research provided.",
    )

    reviewer = client.as_agent(
        name="Reviewer",
        instructions="Review the content and provide a final polished version.",
    )

    # Build a sequential workflow
    workflow = SequentialBuilder(participants=[researcher, writer, reviewer]).build()

    # Convert to a workflow agent
    workflow_agent = workflow.as_agent(name="Content Creation Pipeline")

    # Run the workflow
    print("Starting workflow...")
    print("=" * 60)

    current_author = None
    async for update in workflow_agent.run(
        "Write about quantum computing",
        stream=True,
    ):
        # Show when different agents are responding
        if update.author_name and update.author_name != current_author:
            if current_author:
                print("\n" + "-" * 40)
            print(f"\n[{update.author_name}]:")
            current_author = update.author_name

        if update.text:
            print(update.text, end="", flush=True)

    print("\n" + "=" * 60)
    print("Workflow completed!")


if __name__ == "__main__":
    asyncio.run(main())

Compreender a Conversão de Eventos

Quando um fluxo de trabalho é executado como agente, os eventos do fluxo de trabalho são convertidos em respostas de agentes. O tipo de resposta depende da forma como chama run():

  • run(): Devolve um AgentResponse contendo o resultado completo após o fim do fluxo de trabalho
  • run(..., stream=True): Devolve um iterável assíncrono de AgentResponseUpdate objetos à medida que o fluxo de trabalho é executado, fornecendo atualizações em tempo real

Durante a execução, os eventos internos do fluxo de trabalho são mapeados para as respostas dos agentes da seguinte forma:

Evento de Fluxo de Trabalho Resposta do Agente
event.type == "output" Transmitido como AgentResponseUpdate (streaming) ou agregado em AgentResponse (não-streaming)
event.type == "request_info" Convertido para conteúdo de chamada de função usando WorkflowAgent.REQUEST_INFO_FUNCTION_NAME
Outros eventos Ignorado (apenas fluxo de trabalho interno)

Esta conversão permite-lhe usar a interface padrão do agente, mantendo ainda assim acesso a informações detalhadas do fluxo de trabalho quando necessário.

Casos de uso

1. Pipelines de Agentes Complexos

Envolva um fluxo de trabalho multi-agente como um único agente para uso em aplicações:

User Request --> [Workflow Agent] --> Final Response
                      |
                      +-- Researcher Agent
                      +-- Writer Agent  
                      +-- Reviewer Agent

2. Composição do Agente

Use agentes de fluxo de trabalho como componentes em sistemas maiores:

  • Um agente de workflow pode ser usado como ferramenta por outro agente
  • Múltiplos agentes de fluxo de trabalho podem ser orquestrados em conjunto
  • Os agentes de workflow podem ser aninhados dentro de outros fluxos de trabalho

3. Integração de API

Expor fluxos de trabalho complexos através de APIs que esperam a interface padrão do Agente, possibilitando:

  • Interfaces de chat que utilizam fluxos de trabalho sofisticados no backend
  • Integração com sistemas baseados em agentes existentes
  • Migração gradual de agentes simples para fluxos de trabalho complexos

Próximas Etapas