Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Varning
DET funktionella arbetsflödes-API:et är experimentellt och kan komma att ändras eller tas bort i framtida versioner utan föregående meddelande.
Med API:et för funktionella arbetsflöden kan du skriva arbetsflöden som vanliga Python asynkrona funktioner. I stället för att definiera körklasser, kabelkanter, och med hjälp av WorkflowBuilder dekorerar du en async-funktion med @workflow och använder internt Python kontrollflöde – if/else, for loopar, asyncio.gather – för att uttrycka din logik.
En jämförelse sida vid sida med graf-API:et finns i Arbetsflödes-API:er i arbetsflödesöversikten.
@workflow dekoratör
Använd @workflow för en async funktion för att konvertera den till ett FunctionalWorkflow objekt:
from agent_framework import workflow
@workflow
async def text_pipeline(text: str) -> str:
upper = await to_upper_case(text)
return await reverse_text(upper)
Dekoratören @workflow stöder ett parametriserat formulär med valfria argument:
from agent_framework import InMemoryCheckpointStorage, workflow
storage = InMemoryCheckpointStorage()
@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
...
@workflow Parametrar
| Parameter | Typ | Description |
|---|---|---|
name |
str | None |
Visningsnamn för arbetsflödet. Återgår till funktionens __name__. |
description |
str | None |
Valfri beskrivning som kan läsas av människor. |
checkpoint_storage |
CheckpointStorage | None |
Standardlagring för att bevara stegresultat mellan körningar. Kan åsidosättas per anrop i run(). |
Arbetsflödesfunktionssignatur
Arbetsflödesfunktionens första parameter tar emot indata som skickas till .run(). Lägg bara till en ctx: WorkflowRunContext parameter när du behöver HITL, nyckel/värde-tillstånd eller anpassade händelser – annars är det valfritt:
# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
result = await process(data)
return result
# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
WorkflowRunContext upptäcks först genom typanteckningar, sedan genom parameternamnet ctx, så både ctx: WorkflowRunContext och en parameter utan typ ctx fungerar.
Köra ett arbetsflöde
Anropa .run() på det objekt som returneras av FunctionalWorkflow:
# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world") # str — the raw return value
# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text) # first output as a string
print(result.get_outputs()) # list of all outputs
print(result.get_final_state()) # WorkflowRunState.IDLE
run() Parametrar
| Parameter | Typ | Description |
|---|---|---|
message |
Any | None |
Indata skickades till arbetsflödesfunktionen som sitt första argument. |
stream |
bool |
Om True returnerar en ResponseStream som ger WorkflowEvent objekt. Standardinställningen är False. |
responses |
dict[str, Any] | None |
HITL-svar som har nyckelats av request_id. Används för att återuppta ett pausat arbetsflöde. |
checkpoint_id |
str | None |
Kontrollpunkt att återställa från. Kräver checkpoint_storage att anges. |
checkpoint_storage |
CheckpointStorage | None |
Åsidosätter standardlagringsuppsättningen på dekoratören för den här körningen. |
include_status_events |
bool |
Inkludera statusändringshändelser i det icke-strömmade resultatet. |
Exakt ett av message, responseseller checkpoint_id måste anges per anrop.
WorkflowRunResult
run() (icke-direktuppspelning) returnerar en WorkflowRunResult. Viktiga metoder:
| Metod/egenskap | Returns | Description |
|---|---|---|
.text |
str |
Första utdata som en sträng. Tom sträng om ingen sträng utdata. |
.get_outputs() |
list[Any] |
Alla utdata som genereras av arbetsflödet. |
.get_final_state() |
WorkflowRunState |
Slutligt körningstillstånd (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...). |
.get_request_info_events() |
list[WorkflowEvent] |
Väntande HITL-begäranden när tillståndet är IDLE_WITH_PENDING_REQUESTS. |
Streaming
Skicka stream=True för att ta emot händelser när de skapas:
from agent_framework import workflow
@workflow
async def data_pipeline(url: str) -> str:
raw = await fetch_data(url)
return await transform_data(raw)
# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
if event.type == "output":
print(f"Output: {event.data}")
# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")
Se python/samples/03-workflows/functional/basic_streaming_pipeline.py för ett fullständigt exempel.
@step dekoratör
@step är en valfri dekoratör som lägger till resultatcachelagring, utsläpp av händelser och kontrollpunkter för varje steg till enskilda asynkrona funktioner.
from agent_framework import step, workflow
@step
async def fetch_data(url: str) -> dict:
# expensive — hits a real API
return await http_get(url)
@workflow
async def pipeline(url: str) -> str:
raw = await fetch_data(url)
return process(raw)
Vad @step gör i ett arbetsflöde
-
Lagrar cachade resultat – resultatet lagras av
(step_name, call_index). Vid återupptagning eller återställning av kontrollpunkt inom HITL returneras resultatet av ett slutfört steg omedelbart från sin sparade status, istället för att köras om. -
Genererar händelser –
executor_invoked/executor_completed/executor_failedgenereras för observerbarhet. Vid en cacheträff genererasexecutor_bypassedistället. -
Sparar kontrollpunkter – om arbetsflödet har
checkpoint_storagesparas en kontrollpunkt när varje steg har slutförts. -
Injekterar
WorkflowRunContext— Om stegfunktionen deklarerar enctx: WorkflowRunContextparameter, injekteras den aktiva kontexten automatiskt.
Utanför ett arbetsflöde som körs @step är transparent – funktionen fungerar identiskt med dess odekorerade version, vilket gör den helt testbar i isolering.
När du ska använda @step
Använd @step på funktioner som är dyra att köra igen: agentanrop, externa API-begäranden eller någon åtgärd där omkörning vid CV skulle vara kostsam eller ha biverkningar. Vanliga funktioner (utan @step) fungerar fortfarande i @workflow. De körs bara igen när arbetsflödet återupptas.
from agent_framework import InMemoryCheckpointStorage, step, workflow
storage = InMemoryCheckpointStorage()
@step # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
return (await agent.run(prompt)).text
# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
return len(text) > 0
@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
draft = await call_llm(f"Write about: {topic}")
ok = await validate(draft)
return draft if ok else ""
@step accepterar också en name parameter:
@step(name="transform")
async def transform_data(raw: dict) -> str:
...
Se python/samples/03-workflows/functional/steps_and_checkpointing.py för ett fullständigt exempel.
WorkflowRunContext
WorkflowRunContext (kort alias: RunContext) är körningskontexten som matas in i arbetsflödes- och stegfunktioner. Du behöver den bara när du använder HITL, nyckel/värde-tillstånd eller anpassade händelser.
Importera den från agent_framework:
from agent_framework import WorkflowRunContext, workflow
ctx.request_info() — Människor i processen
ctx.request_info() pausar arbetsflödet för att vänta på externa indata:
@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
draft = await write_draft(topic)
feedback = await ctx.request_info(
{"draft": draft, "instructions": "Please review this draft"},
response_type=str,
request_id="review_request",
)
return await revise_draft(draft, feedback)
Parameters:
| Parameter | Typ | Description |
|---|---|---|
request_data |
Any |
Nyttolast som beskriver vilka indata som behövs (diktamen, Pydantisk modell, sträng, ...). |
response_type |
type |
Förväntad Python-typ av svaret. |
request_id |
str | None |
Stabil identifierare för den här begäran. Ett slumpmässigt UUID genereras om det utelämnas. |
Spela upp semantik: Vid den första körningen request_info() genererar en intern signal (aldrig synlig för din kod) som pausar arbetsflödet. Anroparen får ett WorkflowRunResult med get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Återuppta genom att anropa .run(responses={request_id: value}) – arbetsflödet körs igen överst och request_info() returnerar det angivna värdet omedelbart.
@step-dekorerade funktioner som kördes innan avstängningen returnerar sina cachelagrade resultat när de återupptas istället för att köra igen.
Hantera svaret:
# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
requests = result1.get_request_info_events()
print(requests[0].request_id) # "review_request"
# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)
Se python/samples/03-workflows/functional/hitl_review.py för ett fullständigt exempel.
ctx.request_info() stöds också i @step funktioner.
ctx.add_event() — Anpassade händelser
Använd ctx.add_event() för att generera programspecifika händelser tillsammans med ramverkslivscykelhändelser. Fullständig information och exempel finns i Generera anpassade händelser.
ctx.get_state()
/
ctx.set_state() — Nyckel/värde-tillstånd
Använd ctx.get_state() och ctx.set_state() för att lagra värden som bevaras mellan HITL-avbrott och som ingår i kontrollpunkter. Fullständig information finns i Arbetsflödestillstånd.
Tillståndsvärden måste vara JSON-serialiserbara när kontrollpunktslagring har konfigurerats.
ctx.is_streaming()
Returnerar True när den aktuella körningen startades med stream=True. Användbara i stegfunktioner som vill justera sitt beteende baserat på strömningsläge.
get_run_context()
Hämtar den aktiva WorkflowRunContext från var som helst i ett arbetsflöde som körs – användbart i hjälpfunktioner som inte deklarerar en ctx parameter:
from agent_framework import get_run_context
async def helper():
ctx = get_run_context()
if ctx is not None:
ctx.set_state("helper_ran", True)
Returnerar None när det anropas utanför ett arbetsflöde som körs.
Parallellitet med asyncio.gather
Använd standard Python samtidighet för fan-out/fan-in – inga ramverksprimitiver behövs.
import asyncio
from agent_framework import workflow
@workflow
async def research_pipeline(topic: str) -> str:
web, papers, news = await asyncio.gather(
research_web(topic),
research_papers(topic),
research_news(topic),
)
return await synthesize([web, papers, news])
asyncio.gather fungerar också när funktionerna är dekorerade med @step.
Se python/samples/03-workflows/functional/parallel_pipeline.py för ett fullständigt exempel.
Anropa agenter i arbetsflöden
Agentanrop fungerar som vanliga funktionsanrop i @workflow:
from agent_framework import Agent, workflow
writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)
@workflow
async def poem_workflow(topic: str) -> str:
poem = (await writer.run(f"Write a poem about: {topic}")).text
review = (await reviewer.run(f"Review this poem: {poem}")).text
return f"Poem:\n{poem}\n\nReview: {review}"
Lägg till @step i agentanropsfunktioner när du vill att deras resultat ska cachelagras över HITL-återupptagningar eller kontrollpunktsåterställningar.
from agent_framework import step
@step
async def write_poem(topic: str) -> str:
return (await writer.run(f"Write a poem about: {topic}")).text
Se python/samples/03-workflows/functional/agent_integration.py för ett fullständigt exempel.
.as_agent() – Använda ett arbetsflöde som agent
Omsluta ett FunctionalWorkflow som ett agentkompatibelt objekt med .as_agent():
from agent_framework import workflow
@workflow
async def poem_workflow(topic: str) -> str:
...
# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")
# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)
# Or use in a larger workflow or orchestration
.as_agent() returnerar ett FunctionalWorkflowAgent som exponerar samma run() gränssnitt som andra agentobjekt, vilket gör funktionella arbetsflöden komposterbara med alla system som accepterar agenter.
| Parameter | Typ | Description |
|---|---|---|
name |
str | None |
Visningsnamn för agenten. Standardvärdet är arbetsflödets namn. |
Se python/samples/03-workflows/functional/agent_integration.py för ett exempel.
Samples
Körbara exempel finns i följande exempelmappar:
-
python/samples/01-get-started/— Introduktionsexempel@workflow -
python/samples/03-workflows/functional/— Exempel på funktionella arbetsflöden med fullständig funktion
Nästa steg
Relaterade ämnen:
- Utförare – bearbetningsenheter i det grafbaserade API:et
- Human-in-the-loop – HITL i grafbaserade arbetsflöden
- Kontrollpunkter – kontrollpunktslagring och återuppta
- Händelser – typer av arbetsflödeshändelser
- Använda arbetsflöden som agenter
Det funktionella arbetsflödes-API:et är inte tillgängligt för C# just nu.