Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
A orquestração simultânea permite que vários agentes trabalhem na mesma tarefa em paralelo. Cada agente processa a entrada de forma independente, e seus resultados são coletados e agregados. Essa abordagem é adequada para cenários onde diversas perspetivas ou soluções são valiosas, como brainstorming, raciocínio conjunto ou sistemas de votação.
O que você vai aprender
- Como definir vários agentes com diferentes conhecimentos
- Como orquestrar esses agentes para trabalhar simultaneamente em uma única tarefa
- Como recolher e processar os resultados
Na orquestração simultânea, vários agentes trabalham na mesma tarefa de forma simultânea e independente, fornecendo perspetivas diversas sobre a mesma entrada.
Configurar o Cliente OpenAI do Azure
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient()
.GetProjectResponsesClient()
.AsIChatClient(deploymentName);
Advertência
DefaultAzureCredential é conveniente para o desenvolvimento, mas requer uma consideração cuidadosa na produção. Em produção, considere usar uma credencial específica (por exemplo, ManagedIdentityCredential) para evitar problemas de latência, sondagens não intencionais de credenciais e potenciais riscos de segurança provenientes de mecanismos de recurso.
Defina seus agentes
Crie vários agentes especializados que trabalharão na mesma tarefa simultaneamente:
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for concurrent processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
Configurar a orquestração simultânea
Crie o fluxo de trabalho usando AgentWorkflowBuilder para executar agentes em paralelo:
// 3) Build concurrent workflow
var workflow = AgentWorkflowBuilder.BuildConcurrent(translationAgents);
Executar o fluxo de trabalho simultâneo e coletar resultados
Execute o fluxo de trabalho e processe eventos de todos os agentes em execução simultaneamente:
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentResponseUpdateEvent e)
{
Console.WriteLine($"{e.ExecutorId}: {e.Update.Text}");
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = outputEvt.As<List<ChatMessage>>()!;
break;
}
}
// Display aggregated results from all agents
Console.WriteLine("===== Final Aggregated Results =====");
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Text}");
}
Saída de amostra
French_Agent: English detected. Bonjour, le monde !
Spanish_Agent: English detected. ¡Hola, mundo!
English_Agent: English detected. Hello, world!
===== Final Aggregated Results =====
User: Hello, world!
Assistant: English detected. Bonjour, le monde !
Assistant: English detected. ¡Hola, mundo!
Assistant: English detected. Hello, world!
Conceitos-chave
- Execução paralela: Todos os agentes processam a entrada de forma simultânea e independente
- AgentWorkflowBuilder.BuildConcurrent(): cria um fluxo de trabalho simultâneo a partir de uma coleção de agentes
- Agregação Automática: Os resultados de todos os agentes são automaticamente recolhidos no resultado final
-
Event Streaming: Monitoramento em tempo real do progresso do agente através
AgentResponseUpdateEvent - Perspetivas diversas: Cada agente traz sua experiência única para o mesmo problema
Os agentes são entidades especializadas que podem processar tarefas. O seguinte código define três agentes: um especialista em investigação, um especialista em marketing e um especialista jurídico.
import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
# 1) Create three domain agents using FoundryChatClient
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
researcher = chat_client.as_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
marketer = chat_client.as_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
legal = chat_client.as_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
Configurar a orquestração simultânea
A ConcurrentBuilder classe permite que você construa um fluxo de trabalho para executar vários agentes em paralelo. Você passa a lista de agentes para serem participantes.
from agent_framework.orchestrations import ConcurrentBuilder
# 2) Build a concurrent workflow
# Participants are either Agents (type of SupportsAgentRun) or Executors
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
Executar o fluxo de trabalho simultâneo e coletar os resultados
O agregador predefinido produz uma única AgentResponse mensagem contendo uma mensagem assistente por participante:
from agent_framework import AgentResponse
# 3) Run with a single prompt and print the aggregated agent responses
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
outputs = events.get_outputs()
if outputs:
print("===== Final Aggregated Results =====")
final: AgentResponse = outputs[0]
for msg in final.messages:
name = msg.author_name or "assistant"
print(f"{'-' * 60}\n\n[{name}]:\n{msg.text}")
Saída de amostra
===== Final Aggregated Results =====
------------------------------------------------------------
[researcher]:
**Insights:**
- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
higher fuel costs, and sustainability concerns driving adoption.
...
------------------------------------------------------------
[marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
sustainable design—helping you conquer urban journeys without breaking the bank."
...
------------------------------------------------------------
[legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**
**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
regarding e-bike classification, speed limits, power output, and safety features.
Avançado: Executores de agente personalizáveis
A orquestração simultânea suporta executores personalizados que envolvem agentes com lógica adicional. Isso é útil quando você precisa de mais controle sobre como os agentes são inicializados e como eles processam as solicitações:
Definir Executores de Agente Personalizado
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
Agent,
Executor,
WorkflowContext,
handler,
)
class ResearcherExec(Executor):
def __init__(self, chat_client: FoundryChatClient, id: str = "researcher"):
self.agent = chat_client.as_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name=id,
)
super().__init__(id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
class MarketerExec(Executor):
def __init__(self, chat_client: FoundryChatClient, id: str = "marketer"):
self.agent = chat_client.as_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name=id,
)
super().__init__(id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
Crie um fluxo de trabalho com executores personalizados
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
researcher = ResearcherExec(chat_client)
marketer = MarketerExec(chat_client)
legal = LegalExec(chat_client)
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
Avançado: Agregador personalizado
Por padrão, a orquestração concorrente agrega todas as respostas dos agentes num único AgentResponse, com uma mensagem de assistente por participante. Você pode substituir esse comportamento por um agregador personalizado que processa os resultados de uma maneira específica:
Definir um agregador personalizado
from agent_framework import AgentExecutorResponse
# Create a summarizer agent for the aggregator
summarizer_agent = chat_client.as_agent(
instructions=(
"You are a helpful assistant that consolidates multiple domain expert outputs "
"into one cohesive, concise summary with clear takeaways. Keep it under 200 words."
),
name="summarizer",
)
# Define a custom aggregator callback
async def summarize_results(results: list[AgentExecutorResponse]) -> str:
# Extract one final assistant message per agent
expert_sections: list[str] = []
for r in results:
try:
messages = getattr(r.agent_response, "messages", [])
final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
expert_sections.append(f"{r.executor_id}:\n{final_text}")
except Exception as e:
expert_sections.append(f"{r.executor_id}: (error: {type(e).__name__}: {e})")
# Ask the model to synthesize a concise summary of the experts' outputs
prompt = "\n\n".join(expert_sections)
response = await summarizer_agent.run(prompt)
# Return the model's final assistant text as the completion result
return response.messages[-1].text if response.messages else ""
Criar um fluxo de trabalho com o agregador personalizado
workflow = (
ConcurrentBuilder(participants=[researcher, marketer, legal])
.with_aggregator(summarize_results)
.build()
)
output = None
async for event in workflow.run("We are launching a new budget-friendly electric bike for urban commuters.", stream=True):
if event.type == "output":
output = event.data
if output:
print("===== Final Consolidated Output =====")
print(output)
Saída de exemplo com agregador personalizado
===== Final Consolidated Output =====
Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs,
with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability,
easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities
include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and
developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility.
Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations
for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers)
are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification,
and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible
marketing are required. For connected features, data privacy policies and user consents are mandatory.
Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers,
emphasizing affordability, convenience, and sustainability. Slogan suggestion: "Charge Ahead—City Commutes Made
Affordable." Legal review in each target market, compliance vetting, and robust customer support policies are
critical before launch.
Saídas Intermédias
Por padrão, apenas a saída do agregador surge como um evento de fluxo de trabalho output. Defina intermediate_outputs=True também para mostrar a saída individual de cada participante:
workflow = ConcurrentBuilder(
participants=[researcher, marketer, legal],
intermediate_outputs=True,
).build()
Pode gerir estes eventos em tempo real em modo de streaming:
from agent_framework import AgentResponseUpdate
# Track the last author to format streaming output.
last_author: str | None = None
async for event in workflow.run("Analyze our new product launch strategy.", stream=True):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
update = event.data
author = update.author_name
if author != last_author:
if last_author is not None:
print() # Newline between different authors
print(f"{author}: {update.text}", end="", flush=True)
last_author = author
else:
print(update.text, end="", flush=True)
Conceitos-chave
- Execução paralela: Todos os agentes trabalham na tarefa de forma simultânea e independente
-
Saída AgentResponse: O agregador padrão produz uma única
AgentResponsecom uma mensagem de assistente por participante (sem comando de utilizador incluído) - Perspetivas diversas: Cada agente traz sua experiência única para o mesmo problema
- Participantes flexíveis: Você pode usar agentes diretamente ou envolvê-los em executores personalizados
- Processamento personalizado: substitua o agregador padrão para sintetizar resultados de maneiras específicas do domínio
-
Saídas Intermédias: Definir
intermediate_outputs=Truepara mostrar a saída individual de cada participante, além da saída final do agregador