API de flux de travail fonctionnel

Avertissement

L’API de flux de travail fonctionnel est expérimentale et sujette à modification ou suppression dans les versions ultérieures sans préavis.

L’API de flux de travail fonctionnel vous permet d’écrire des flux de travail en tant que fonctions asynchrones Python simples. Au lieu de définir des classes d’exécuteur, des bords de câblage, et en utilisant WorkflowBuilder, vous décorez une fonction async avec @workflow et utilisez le flux de contrôle Python natif ( if/else, for boucles, asyncio.gather) pour exprimer votre logique.

Pour une comparaison côte à côte avec l’API de graphe, consultez les API de flux de travail dans la vue d’ensemble des flux de travail.

Décorateur @workflow

Appliquer @workflow à une fonction async pour la convertir en un objet FunctionalWorkflow.

from agent_framework import workflow

@workflow
async def text_pipeline(text: str) -> str:
    upper = await to_upper_case(text)
    return await reverse_text(upper)

Le @workflow décorateur prend en charge un formulaire paramétrable avec des arguments facultatifs :

from agent_framework import InMemoryCheckpointStorage, workflow

storage = InMemoryCheckpointStorage()

@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
    ...

Paramètres @workflow

Paramètre Type Description
name str | None Nom d'affichage du flux de travail. Par défaut, la fonction est __name__.
description str | None Description facultative lisible par l’homme.
checkpoint_storage CheckpointStorage | None Stockage par défaut pour conserver les résultats des étapes entre les exécutions. Peut être substitué par appel dans run().

Signature de la fonction de flux de travail

Le premier paramètre de la fonction de flux de travail reçoit l’entrée passée à .run(). Ajoutez un paramètre ctx: WorkflowRunContext uniquement si vous avez besoin de HITL, d’un état clé/valeur ou d'événements personnalisés. Sinon, il est facultatif :

# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
    result = await process(data)
    return result

# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
    feedback = await ctx.request_info({"draft": data}, response_type=str)
    return feedback

WorkflowRunContext est d'abord détecté par l'annotation de type, puis par le nom du paramètre ctx, donc le paramètre ctx: WorkflowRunContext et un paramètre nu ctx fonctionnent tous les deux.

Exécution d’un workflow

Appelez .run() sur l’objet FunctionalWorkflow retourné par @workflow :

# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world")   # str — the raw return value

# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text)                # first output as a string
print(result.get_outputs())       # list of all outputs
print(result.get_final_state())   # WorkflowRunState.IDLE

Paramètres run()

Paramètre Type Description
message Any | None Entrée passée à la fonction de workflow comme premier argument.
stream bool Si True, retourne un ResponseStream qui génère des objets WorkflowEvent. La valeur par défaut est False.
responses dict[str, Any] | None Réponses HITL basées sur request_id. Utilisé pour reprendre un flux de travail suspendu.
checkpoint_id str | None Point de contrôle à partir duquel effectuer la restauration. checkpoint_storage doit être défini.
checkpoint_storage CheckpointStorage | None Remplace le stockage par défaut défini sur le décorateur pour cette exécution.
include_status_events bool Incluez les événements de changement d'état dans le résultat hors diffusion en continu.

Exactement l’un des message, responsesou checkpoint_id doit être fourni par appel.

WorkflowRunResult

run() (non-streaming) retourne un WorkflowRunResult. Méthodes clés :

Méthode / propriété Returns Description
.text str Première sortie sous forme de chaîne. Chaîne vide si aucune chaîne de caractères ne produit de sortie.
.get_outputs() list[Any] Toutes les sorties émises par le flux de travail.
.get_final_state() WorkflowRunState État d’exécution final (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED...).
.get_request_info_events() list[WorkflowEvent] Demandes HITL en attente lorsque l’état est IDLE_WITH_PENDING_REQUESTS.

Diffusion en continu

Passez stream=True pour recevoir des événements au fur et à mesure qu’ils sont générés :

from agent_framework import workflow

@workflow
async def data_pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return await transform_data(raw)

# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
    if event.type == "output":
        print(f"Output: {event.data}")

# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")

Consultez python/samples/03-workflows/functional/basic_streaming_pipeline.py pour un exemple complet.

Décorateur @step

@step est un décorateur optionnel qui ajoute la mise en cache des résultats, l’émission d’événements et le point de contrôle à chaque étape aux fonctions asynchrones individuelles.

from agent_framework import step, workflow

@step
async def fetch_data(url: str) -> dict:
    # expensive — hits a real API
    return await http_get(url)

@workflow
async def pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return process(raw)

Que @step fait-il dans un workflow

  • Met en cache les résultats : le résultat est stocké par (step_name, call_index). Lors de la restauration de point de contrôle ou de reprise HITL, une étape terminée retourne instantanément son résultat enregistré au lieu de réexécuter.
  • Émet des événementsexecutor_invoked / executor_completed / executor_failed sont émis pour l’observabilité. Lors d'un cache hit, executor_bypassed est émis à la place.
  • Enregistre les points de contrôle : si le flux de travail a checkpoint_storage, un point de contrôle est enregistré une fois chaque étape terminée.
  • Injecte WorkflowRunContext — si la fonction d’étape déclare un ctx: WorkflowRunContext paramètre, le contexte actif est automatiquement injecté.

En dehors d’un flux de travail en cours d’exécution, @step la fonction se comporte de la même manière que sa version non décorée, ce qui le rend entièrement testable en isolation.

Quand utiliser @step

Utiliser @step sur les fonctions coûteuses à réexécuter : appels d’agent, demandes d’API externes ou toute opération où la réexécution lors de la reprise serait coûteuse ou aurait des effets secondaires. Les fonctions simples (sans @step) fonctionnent toujours à l’intérieur @workflow; elles s’exécutent simplement à nouveau lorsque le flux de travail reprend.

from agent_framework import InMemoryCheckpointStorage, step, workflow

storage = InMemoryCheckpointStorage()

@step  # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
    return (await agent.run(prompt)).text

# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
    return len(text) > 0

@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
    draft = await call_llm(f"Write about: {topic}")
    ok = await validate(draft)
    return draft if ok else ""

@step accepte également un name paramètre :

@step(name="transform")
async def transform_data(raw: dict) -> str:
    ...

Consultez python/samples/03-workflows/functional/steps_and_checkpointing.py pour un exemple complet.

WorkflowRunContext

WorkflowRunContext (alias court : RunContext) est le contexte d’exécution injecté dans les fonctions de flux de travail et d’étape. Vous n’en avez besoin que lorsque vous utilisez HITL, l’état clé/valeur ou les événements personnalisés.

Importez-le à partir de agent_framework:

from agent_framework import WorkflowRunContext, workflow

ctx.request_info() — Humain dans la boucle

ctx.request_info() suspend le flux de travail pour attendre l’entrée externe :

@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
    draft = await write_draft(topic)
    feedback = await ctx.request_info(
        {"draft": draft, "instructions": "Please review this draft"},
        response_type=str,
        request_id="review_request",
    )
    return await revise_draft(draft, feedback)

Paramètres :

Paramètre Type Description
request_data Any Charge utile décrivant l’entrée nécessaire (dict, modèle Pydantic, chaîne, ...).
response_type type Type de réponse Python attendu.
request_id str | None Identificateur stable pour cette requête. Un UUID aléatoire est généré s’il est omis.

Sémantique de relecture : Lors de la première exécution, request_info() déclenche un signal interne (jamais visible par votre code) qui suspend le flux de travail. L’appelant reçoit un WorkflowRunResult avec get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Reprendre en appelant .run(responses={request_id: value}) : le flux de travail s’exécute à nouveau à partir du haut et request_info() retourne immédiatement la valeur fournie.

Les fonctions @step- décorées qui ont été exécutées avant la suspension retournent leurs résultats mis en cache lors de la reprise au lieu d'être réexécutées.

Gestion de la réponse :

# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
print(requests[0].request_id)  # "review_request"

# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
    responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)

Consultez python/samples/03-workflows/functional/hitl_review.py pour un exemple complet.

ctx.request_info() est également pris en charge à l'intérieur des fonctions @step.

ctx.add_event() — Événements personnalisés

Permet ctx.add_event() d’émettre des événements spécifiques à l’application en même temps que des événements de cycle de vie du framework. Pour obtenir des détails et des exemples complets, consultez Émission d’événements personnalisés.

ctx.get_state() / ctx.set_state() — État clé/valeur

Utilisez ctx.get_state() et ctx.set_state() pour stocker des valeurs qui persistent entre les interruptions HITL et qui sont incluses dans les points de contrôle. Pour plus d’informations, consultez l’état du flux de travail.

Les valeurs d’état doivent être sérialisables JSON lorsque le stockage de point de contrôle est configuré.

ctx.is_streaming()

Retourne True lorsque l’exécution actuelle a été démarrée avec stream=True. Utile à l’intérieur des fonctions qui souhaitent ajuster leur comportement en fonction du mode de diffusion en continu.

get_run_context()

Récupère l’actif WorkflowRunContext n’importe où à l’intérieur d’un workflow en cours d’exécution, utile dans les fonctions d'aide qui ne déclarent pas ctx paramètre :

from agent_framework import get_run_context

async def helper():
    ctx = get_run_context()
    if ctx is not None:
        ctx.set_state("helper_ran", True)

Retourne une valeur None lorsqu’elle est appelée en dehors d’un flux de travail en cours d’exécution.

Parallélisme avec asyncio.gather

Utilisez la concurrence standard Python pour fan-out/fan-in : aucune primitive d’infrastructure n’est nécessaire :

import asyncio
from agent_framework import workflow

@workflow
async def research_pipeline(topic: str) -> str:
    web, papers, news = await asyncio.gather(
        research_web(topic),
        research_papers(topic),
        research_news(topic),
    )
    return await synthesize([web, papers, news])

asyncio.gather fonctionne également lorsque les fonctions sont décorées avec @step.

Consultez python/samples/03-workflows/functional/parallel_pipeline.py pour un exemple complet.

Appel d’agents dans des processus de travail

Les appels d’agent fonctionnent en tant qu’appels de fonction simples à l’intérieur de @workflow.

from agent_framework import Agent, workflow

writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)

@workflow
async def poem_workflow(topic: str) -> str:
    poem = (await writer.run(f"Write a poem about: {topic}")).text
    review = (await reviewer.run(f"Review this poem: {poem}")).text
    return f"Poem:\n{poem}\n\nReview: {review}"

Ajoutez @step aux fonctions d'appel d'agent lorsque vous souhaitez mettre en cache leurs résultats pendant les reprises ou restaurations de points de contrôle dans les processus HITL.

from agent_framework import step

@step
async def write_poem(topic: str) -> str:
    return (await writer.run(f"Write a poem about: {topic}")).text

Consultez python/samples/03-workflows/functional/agent_integration.py pour un exemple complet.

.as_agent() — Utilisation d’un flux de travail en tant qu’agent

Encapsulez un objet FunctionalWorkflow compatible avec l'agent avec .as_agent():

from agent_framework import workflow

@workflow
async def poem_workflow(topic: str) -> str:
    ...

# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")

# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)

# Or use in a larger workflow or orchestration

.as_agent() retourne un FunctionalWorkflowAgent qui expose la même run() interface que d’autres objets d’agent, ce qui rend les workflows fonctionnels composables avec n’importe quel système qui accepte les agents.

Paramètre Type Description
name str | None Nom affiché pour l’agent. Le nom du flux de travail est utilisé par défaut.

Consultez python/samples/03-workflows/functional/agent_integration.py pour un exemple.

Samples

Les exemples exécutables se trouvent dans les exemples de dossiers suivants :

Étapes suivantes

Rubriques connexes :

L’API de flux de travail fonctionnel n’est pas disponible pour C# pour l’instant.