Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Avertissement
L’API de flux de travail fonctionnel est expérimentale et sujette à modification ou suppression dans les versions ultérieures sans préavis.
L’API de flux de travail fonctionnel vous permet d’écrire des flux de travail en tant que fonctions asynchrones Python simples. Au lieu de définir des classes d’exécuteur, des bords de câblage, et en utilisant WorkflowBuilder, vous décorez une fonction async avec @workflow et utilisez le flux de contrôle Python natif ( if/else, for boucles, asyncio.gather) pour exprimer votre logique.
Pour une comparaison côte à côte avec l’API de graphe, consultez les API de flux de travail dans la vue d’ensemble des flux de travail.
Décorateur @workflow
Appliquer @workflow à une fonction async pour la convertir en un objet FunctionalWorkflow.
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
Le @workflow décorateur prend en charge un formulaire paramétrable avec des arguments facultatifs :
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
Paramètres @workflow
| Paramètre | Type | Description |
|---|---|---|
name |
str | None |
Nom d'affichage du flux de travail. Par défaut, la fonction est __name__. |
description |
str | None |
Description facultative lisible par l’homme. |
checkpoint_storage |
CheckpointStorage | None |
Stockage par défaut pour conserver les résultats des étapes entre les exécutions. Peut être substitué par appel dans run(). |
Signature de la fonction de flux de travail
Le premier paramètre de la fonction de flux de travail reçoit l’entrée passée à .run(). Ajoutez un paramètre ctx: WorkflowRunContext uniquement si vous avez besoin de HITL, d’un état clé/valeur ou d'événements personnalisés. Sinon, il est facultatif :
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext est d'abord détecté par l'annotation de type, puis par le nom du paramètre ctx, donc le paramètre ctx: WorkflowRunContext et un paramètre nu ctx fonctionnent tous les deux.
Exécution d’un workflow
Appelez .run() sur l’objet FunctionalWorkflow retourné par @workflow :
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
Paramètres run()
| Paramètre | Type | Description |
|---|---|---|
message |
Any | None |
Entrée passée à la fonction de workflow comme premier argument. |
stream |
bool |
Si True, retourne un ResponseStream qui génère des objets WorkflowEvent. La valeur par défaut est False. |
responses |
dict[str, Any] | None |
Réponses HITL basées sur request_id. Utilisé pour reprendre un flux de travail suspendu. |
checkpoint_id |
str | None |
Point de contrôle à partir duquel effectuer la restauration.
checkpoint_storage doit être défini. |
checkpoint_storage |
CheckpointStorage | None |
Remplace le stockage par défaut défini sur le décorateur pour cette exécution. |
include_status_events |
bool |
Incluez les événements de changement d'état dans le résultat hors diffusion en continu. |
Exactement l’un des message, responsesou checkpoint_id doit être fourni par appel.
WorkflowRunResult
run() (non-streaming) retourne un WorkflowRunResult. Méthodes clés :
| Méthode / propriété | Returns | Description |
|---|---|---|
.text |
str |
Première sortie sous forme de chaîne. Chaîne vide si aucune chaîne de caractères ne produit de sortie. |
.get_outputs() |
list[Any] |
Toutes les sorties émises par le flux de travail. |
.get_final_state() |
WorkflowRunState |
État d’exécution final (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED...). |
.get_request_info_events() |
list[WorkflowEvent] |
Demandes HITL en attente lorsque l’état est IDLE_WITH_PENDING_REQUESTS. |
Diffusion en continu
Passez stream=True pour recevoir des événements au fur et à mesure qu’ils sont générés :
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
Consultez python/samples/03-workflows/functional/basic_streaming_pipeline.py pour un exemple complet.
Décorateur @step
@step est un décorateur optionnel qui ajoute la mise en cache des résultats, l’émission d’événements et le point de contrôle à chaque étape aux fonctions asynchrones individuelles.
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
Que @step fait-il dans un workflow
-
Met en cache les résultats : le résultat est stocké par
(step_name, call_index). Lors de la restauration de point de contrôle ou de reprise HITL, une étape terminée retourne instantanément son résultat enregistré au lieu de réexécuter. -
Émet des événements —
executor_invoked/executor_completed/executor_failedsont émis pour l’observabilité. Lors d'un cache hit,executor_bypassedest émis à la place. -
Enregistre les points de contrôle : si le flux de travail a
checkpoint_storage, un point de contrôle est enregistré une fois chaque étape terminée. -
Injecte
WorkflowRunContext— si la fonction d’étape déclare unctx: WorkflowRunContextparamètre, le contexte actif est automatiquement injecté.
En dehors d’un flux de travail en cours d’exécution, @step la fonction se comporte de la même manière que sa version non décorée, ce qui le rend entièrement testable en isolation.
Quand utiliser @step
Utiliser @step sur les fonctions coûteuses à réexécuter : appels d’agent, demandes d’API externes ou toute opération où la réexécution lors de la reprise serait coûteuse ou aurait des effets secondaires. Les fonctions simples (sans @step) fonctionnent toujours à l’intérieur @workflow; elles s’exécutent simplement à nouveau lorsque le flux de travail reprend.
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step accepte également un name paramètre :
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
Consultez python/samples/03-workflows/functional/steps_and_checkpointing.py pour un exemple complet.
WorkflowRunContext
WorkflowRunContext (alias court : RunContext) est le contexte d’exécution injecté dans les fonctions de flux de travail et d’étape. Vous n’en avez besoin que lorsque vous utilisez HITL, l’état clé/valeur ou les événements personnalisés.
Importez-le à partir de agent_framework:
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — Humain dans la boucle
ctx.request_info() suspend le flux de travail pour attendre l’entrée externe :
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
Paramètres :
| Paramètre | Type | Description |
|---|---|---|
request_data |
Any |
Charge utile décrivant l’entrée nécessaire (dict, modèle Pydantic, chaîne, ...). |
response_type |
type |
Type de réponse Python attendu. |
request_id |
str | None |
Identificateur stable pour cette requête. Un UUID aléatoire est généré s’il est omis. |
Sémantique de relecture : Lors de la première exécution, request_info() déclenche un signal interne (jamais visible par votre code) qui suspend le flux de travail. L’appelant reçoit un WorkflowRunResult avec get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Reprendre en appelant .run(responses={request_id: value}) : le flux de travail s’exécute à nouveau à partir du haut et request_info() retourne immédiatement la valeur fournie.
Les fonctions @step- décorées qui ont été exécutées avant la suspension retournent leurs résultats mis en cache lors de la reprise au lieu d'être réexécutées.
Gestion de la réponse :
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
Consultez python/samples/03-workflows/functional/hitl_review.py pour un exemple complet.
ctx.request_info() est également pris en charge à l'intérieur des fonctions @step.
ctx.add_event() — Événements personnalisés
Permet ctx.add_event() d’émettre des événements spécifiques à l’application en même temps que des événements de cycle de vie du framework. Pour obtenir des détails et des exemples complets, consultez Émission d’événements personnalisés.
ctx.get_state()
/
ctx.set_state() — État clé/valeur
Utilisez ctx.get_state() et ctx.set_state() pour stocker des valeurs qui persistent entre les interruptions HITL et qui sont incluses dans les points de contrôle. Pour plus d’informations, consultez l’état du flux de travail.
Les valeurs d’état doivent être sérialisables JSON lorsque le stockage de point de contrôle est configuré.
ctx.is_streaming()
Retourne True lorsque l’exécution actuelle a été démarrée avec stream=True. Utile à l’intérieur des fonctions qui souhaitent ajuster leur comportement en fonction du mode de diffusion en continu.
get_run_context()
Récupère l’actif WorkflowRunContext n’importe où à l’intérieur d’un workflow en cours d’exécution, utile dans les fonctions d'aide qui ne déclarent pas ctx paramètre :
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
Retourne une valeur None lorsqu’elle est appelée en dehors d’un flux de travail en cours d’exécution.
Parallélisme avec asyncio.gather
Utilisez la concurrence standard Python pour fan-out/fan-in : aucune primitive d’infrastructure n’est nécessaire :
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather fonctionne également lorsque les fonctions sont décorées avec @step.
Consultez python/samples/03-workflows/functional/parallel_pipeline.py pour un exemple complet.
Appel d’agents dans des processus de travail
Les appels d’agent fonctionnent en tant qu’appels de fonction simples à l’intérieur de @workflow.
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
Ajoutez @step aux fonctions d'appel d'agent lorsque vous souhaitez mettre en cache leurs résultats pendant les reprises ou restaurations de points de contrôle dans les processus HITL.
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
Consultez python/samples/03-workflows/functional/agent_integration.py pour un exemple complet.
.as_agent() — Utilisation d’un flux de travail en tant qu’agent
Encapsulez un objet FunctionalWorkflow compatible avec l'agent avec .as_agent():
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent() retourne un FunctionalWorkflowAgent qui expose la même run() interface que d’autres objets d’agent, ce qui rend les workflows fonctionnels composables avec n’importe quel système qui accepte les agents.
| Paramètre | Type | Description |
|---|---|---|
name |
str | None |
Nom affiché pour l’agent. Le nom du flux de travail est utilisé par défaut. |
Consultez python/samples/03-workflows/functional/agent_integration.py pour un exemple.
Samples
Les exemples exécutables se trouvent dans les exemples de dossiers suivants :
-
python/samples/01-get-started/— exemples d’introduction@workflow -
python/samples/03-workflows/functional/— Exemples de flux de travail fonctionnels complets
Étapes suivantes
Rubriques connexes :
- Exécuteurs : unités de traitement dans l’API basée sur graphique
- Human-in-the-loop — HITL dans les flux de travail basés sur des graphiques
- Points de contrôle — stockage des points de contrôle et reprise
- Événements : types d’événements de flux de travail
- Utilisation de flux de travail en tant qu’agents
L’API de flux de travail fonctionnel n’est pas disponible pour C# pour l’instant.