API för funktionellt arbetsflöde

Varning

DET funktionella arbetsflödes-API:et är experimentellt och kan komma att ändras eller tas bort i framtida versioner utan föregående meddelande.

Med API:et för funktionella arbetsflöden kan du skriva arbetsflöden som vanliga Python asynkrona funktioner. I stället för att definiera körklasser, kabelkanter, och med hjälp av WorkflowBuilder dekorerar du en async-funktion med @workflow och använder internt Python kontrollflöde – if/else, for loopar, asyncio.gather – för att uttrycka din logik.

En jämförelse sida vid sida med graf-API:et finns i Arbetsflödes-API:er i arbetsflödesöversikten.

@workflow dekoratör

Använd @workflow för en async funktion för att konvertera den till ett FunctionalWorkflow objekt:

from agent_framework import workflow

@workflow
async def text_pipeline(text: str) -> str:
    upper = await to_upper_case(text)
    return await reverse_text(upper)

Dekoratören @workflow stöder ett parametriserat formulär med valfria argument:

from agent_framework import InMemoryCheckpointStorage, workflow

storage = InMemoryCheckpointStorage()

@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
    ...

@workflow Parametrar

Parameter Typ Description
name str | None Visningsnamn för arbetsflödet. Återgår till funktionens __name__.
description str | None Valfri beskrivning som kan läsas av människor.
checkpoint_storage CheckpointStorage | None Standardlagring för att bevara stegresultat mellan körningar. Kan åsidosättas per anrop i run().

Arbetsflödesfunktionssignatur

Arbetsflödesfunktionens första parameter tar emot indata som skickas till .run(). Lägg bara till en ctx: WorkflowRunContext parameter när du behöver HITL, nyckel/värde-tillstånd eller anpassade händelser – annars är det valfritt:

# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
    result = await process(data)
    return result

# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
    feedback = await ctx.request_info({"draft": data}, response_type=str)
    return feedback

WorkflowRunContext upptäcks först genom typanteckningar, sedan genom parameternamnet ctx, så både ctx: WorkflowRunContext och en parameter utan typ ctx fungerar.

Köra ett arbetsflöde

Anropa .run() på det objekt som returneras av FunctionalWorkflow:

# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world")   # str — the raw return value

# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text)                # first output as a string
print(result.get_outputs())       # list of all outputs
print(result.get_final_state())   # WorkflowRunState.IDLE

run() Parametrar

Parameter Typ Description
message Any | None Indata skickades till arbetsflödesfunktionen som sitt första argument.
stream bool Om True returnerar en ResponseStream som ger WorkflowEvent objekt. Standardinställningen är False.
responses dict[str, Any] | None HITL-svar som har nyckelats av request_id. Används för att återuppta ett pausat arbetsflöde.
checkpoint_id str | None Kontrollpunkt att återställa från. Kräver checkpoint_storage att anges.
checkpoint_storage CheckpointStorage | None Åsidosätter standardlagringsuppsättningen på dekoratören för den här körningen.
include_status_events bool Inkludera statusändringshändelser i det icke-strömmade resultatet.

Exakt ett av message, responseseller checkpoint_id måste anges per anrop.

WorkflowRunResult

run() (icke-direktuppspelning) returnerar en WorkflowRunResult. Viktiga metoder:

Metod/egenskap Returns Description
.text str Första utdata som en sträng. Tom sträng om ingen sträng utdata.
.get_outputs() list[Any] Alla utdata som genereras av arbetsflödet.
.get_final_state() WorkflowRunState Slutligt körningstillstånd (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED, ...).
.get_request_info_events() list[WorkflowEvent] Väntande HITL-begäranden när tillståndet är IDLE_WITH_PENDING_REQUESTS.

Streaming

Skicka stream=True för att ta emot händelser när de skapas:

from agent_framework import workflow

@workflow
async def data_pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return await transform_data(raw)

# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
    if event.type == "output":
        print(f"Output: {event.data}")

# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")

Se python/samples/03-workflows/functional/basic_streaming_pipeline.py för ett fullständigt exempel.

@step dekoratör

@step är en valfri dekoratör som lägger till resultatcachelagring, utsläpp av händelser och kontrollpunkter för varje steg till enskilda asynkrona funktioner.

from agent_framework import step, workflow

@step
async def fetch_data(url: str) -> dict:
    # expensive — hits a real API
    return await http_get(url)

@workflow
async def pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return process(raw)

Vad @step gör i ett arbetsflöde

  • Lagrar cachade resultat – resultatet lagras av (step_name, call_index). Vid återupptagning eller återställning av kontrollpunkt inom HITL returneras resultatet av ett slutfört steg omedelbart från sin sparade status, istället för att köras om.
  • Genererar händelserexecutor_invoked / executor_completed / executor_failed genereras för observerbarhet. Vid en cacheträff genereras executor_bypassed istället.
  • Sparar kontrollpunkter – om arbetsflödet har checkpoint_storagesparas en kontrollpunkt när varje steg har slutförts.
  • Injekterar WorkflowRunContext — Om stegfunktionen deklarerar en ctx: WorkflowRunContext parameter, injekteras den aktiva kontexten automatiskt.

Utanför ett arbetsflöde som körs @step är transparent – funktionen fungerar identiskt med dess odekorerade version, vilket gör den helt testbar i isolering.

När du ska använda @step

Använd @step på funktioner som är dyra att köra igen: agentanrop, externa API-begäranden eller någon åtgärd där omkörning vid CV skulle vara kostsam eller ha biverkningar. Vanliga funktioner (utan @step) fungerar fortfarande i @workflow. De körs bara igen när arbetsflödet återupptas.

from agent_framework import InMemoryCheckpointStorage, step, workflow

storage = InMemoryCheckpointStorage()

@step  # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
    return (await agent.run(prompt)).text

# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
    return len(text) > 0

@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
    draft = await call_llm(f"Write about: {topic}")
    ok = await validate(draft)
    return draft if ok else ""

@step accepterar också en name parameter:

@step(name="transform")
async def transform_data(raw: dict) -> str:
    ...

Se python/samples/03-workflows/functional/steps_and_checkpointing.py för ett fullständigt exempel.

WorkflowRunContext

WorkflowRunContext (kort alias: RunContext) är körningskontexten som matas in i arbetsflödes- och stegfunktioner. Du behöver den bara när du använder HITL, nyckel/värde-tillstånd eller anpassade händelser.

Importera den från agent_framework:

from agent_framework import WorkflowRunContext, workflow

ctx.request_info() — Människor i processen

ctx.request_info() pausar arbetsflödet för att vänta på externa indata:

@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
    draft = await write_draft(topic)
    feedback = await ctx.request_info(
        {"draft": draft, "instructions": "Please review this draft"},
        response_type=str,
        request_id="review_request",
    )
    return await revise_draft(draft, feedback)

Parameters:

Parameter Typ Description
request_data Any Nyttolast som beskriver vilka indata som behövs (diktamen, Pydantisk modell, sträng, ...).
response_type type Förväntad Python-typ av svaret.
request_id str | None Stabil identifierare för den här begäran. Ett slumpmässigt UUID genereras om det utelämnas.

Spela upp semantik: Vid den första körningen request_info() genererar en intern signal (aldrig synlig för din kod) som pausar arbetsflödet. Anroparen får ett WorkflowRunResult med get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Återuppta genom att anropa .run(responses={request_id: value}) – arbetsflödet körs igen överst och request_info() returnerar det angivna värdet omedelbart.

@step-dekorerade funktioner som kördes innan avstängningen returnerar sina cachelagrade resultat när de återupptas istället för att köra igen.

Hantera svaret:

# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
print(requests[0].request_id)  # "review_request"

# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
    responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)

Se python/samples/03-workflows/functional/hitl_review.py för ett fullständigt exempel.

ctx.request_info() stöds också i @step funktioner.

ctx.add_event() — Anpassade händelser

Använd ctx.add_event() för att generera programspecifika händelser tillsammans med ramverkslivscykelhändelser. Fullständig information och exempel finns i Generera anpassade händelser.

ctx.get_state() / ctx.set_state() — Nyckel/värde-tillstånd

Använd ctx.get_state() och ctx.set_state() för att lagra värden som bevaras mellan HITL-avbrott och som ingår i kontrollpunkter. Fullständig information finns i Arbetsflödestillstånd.

Tillståndsvärden måste vara JSON-serialiserbara när kontrollpunktslagring har konfigurerats.

ctx.is_streaming()

Returnerar True när den aktuella körningen startades med stream=True. Användbara i stegfunktioner som vill justera sitt beteende baserat på strömningsläge.

get_run_context()

Hämtar den aktiva WorkflowRunContext från var som helst i ett arbetsflöde som körs – användbart i hjälpfunktioner som inte deklarerar en ctx parameter:

from agent_framework import get_run_context

async def helper():
    ctx = get_run_context()
    if ctx is not None:
        ctx.set_state("helper_ran", True)

Returnerar None när det anropas utanför ett arbetsflöde som körs.

Parallellitet med asyncio.gather

Använd standard Python samtidighet för fan-out/fan-in – inga ramverksprimitiver behövs.

import asyncio
from agent_framework import workflow

@workflow
async def research_pipeline(topic: str) -> str:
    web, papers, news = await asyncio.gather(
        research_web(topic),
        research_papers(topic),
        research_news(topic),
    )
    return await synthesize([web, papers, news])

asyncio.gather fungerar också när funktionerna är dekorerade med @step.

Se python/samples/03-workflows/functional/parallel_pipeline.py för ett fullständigt exempel.

Anropa agenter i arbetsflöden

Agentanrop fungerar som vanliga funktionsanrop i @workflow:

from agent_framework import Agent, workflow

writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)

@workflow
async def poem_workflow(topic: str) -> str:
    poem = (await writer.run(f"Write a poem about: {topic}")).text
    review = (await reviewer.run(f"Review this poem: {poem}")).text
    return f"Poem:\n{poem}\n\nReview: {review}"

Lägg till @step i agentanropsfunktioner när du vill att deras resultat ska cachelagras över HITL-återupptagningar eller kontrollpunktsåterställningar.

from agent_framework import step

@step
async def write_poem(topic: str) -> str:
    return (await writer.run(f"Write a poem about: {topic}")).text

Se python/samples/03-workflows/functional/agent_integration.py för ett fullständigt exempel.

.as_agent() – Använda ett arbetsflöde som agent

Omsluta ett FunctionalWorkflow som ett agentkompatibelt objekt med .as_agent():

from agent_framework import workflow

@workflow
async def poem_workflow(topic: str) -> str:
    ...

# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")

# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)

# Or use in a larger workflow or orchestration

.as_agent() returnerar ett FunctionalWorkflowAgent som exponerar samma run() gränssnitt som andra agentobjekt, vilket gör funktionella arbetsflöden komposterbara med alla system som accepterar agenter.

Parameter Typ Description
name str | None Visningsnamn för agenten. Standardvärdet är arbetsflödets namn.

Se python/samples/03-workflows/functional/agent_integration.py för ett exempel.

Samples

Körbara exempel finns i följande exempelmappar:

Nästa steg

Relaterade ämnen:

Det funktionella arbetsflödes-API:et är inte tillgängligt för C# just nu.