Functionele werkstroom-API

Warning

De functionele werkstroom-API is experimenteel en kan in toekomstige versies zonder kennisgeving worden gewijzigd of verwijderd.

Met de functionele werkstroom-API kunt u werkstromen schrijven als gewone Python asynchrone functies. In plaats van uitvoerdersklassen te definiëren, bedradingsranden, en met WorkflowBuilder versiereert u een async functie met @workflow en gebruikt u systeemeigen Python controlestroom — if/else, for lussen, asyncio.gather — om uw logica uit te drukken.

Zie Werkstroom-API's in het overzicht van werkstromen voor een vergelijking naast elkaar met de grafiek-API.

@workflow decorateur

Toepassen @workflow op een async functie om deze te converteren naar een FunctionalWorkflow object:

from agent_framework import workflow

@workflow
async def text_pipeline(text: str) -> str:
    upper = await to_upper_case(text)
    return await reverse_text(upper)

De @workflow decorator ondersteunt een geparameteriseerd formulier met optionele argumenten:

from agent_framework import InMemoryCheckpointStorage, workflow

storage = InMemoryCheckpointStorage()

@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
    ...

@workflow Parameters

Parameter Type Description
name str | None Weergavenaam voor het proces. Standaard ingesteld op de __name__functie.
description str | None Optionele door mensen leesbare beschrijving.
checkpoint_storage CheckpointStorage | None Standaardopslag voor het behouden van stapresultaten tussen uitvoeringen. Kan in run() per aanroep worden overschreven.

Handtekening voor werkstroomfuncties

De eerste parameter van de werkstroomfunctie ontvangt de invoer die is doorgegeven aan .run(). Voeg alleen een ctx: WorkflowRunContext parameter toe als u HITL, sleutel/waardestatus of aangepaste gebeurtenissen nodig hebt. Anders is dit optioneel:

# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
    result = await process(data)
    return result

# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
    feedback = await ctx.request_info({"draft": data}, response_type=str)
    return feedback

WorkflowRunContext wordt eerst door typeaantekening gedetecteerd, vervolgens door de parameternaam ctx, zodat zowel ctx: WorkflowRunContext als een lege ctx parameter werken.

Een werkstroom uitvoeren

Roep .run() aan op het FunctionalWorkflow-object dat wordt geretourneerd door @workflow:

# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world")   # str — the raw return value

# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text)                # first output as a string
print(result.get_outputs())       # list of all outputs
print(result.get_final_state())   # WorkflowRunState.IDLE

run() Parameters

Parameter Type Description
message Any | None Invoer die als eerste argument aan de werkstroomfunctie is doorgegeven.
stream bool Als True retourneert een ResponseStream die WorkflowEvent objecten oplevert. Wordt standaard ingesteld op False.
responses dict[str, Any] | None HITL-antwoorden die zijn gesleuteld door request_id. Wordt gebruikt om een onderbroken werkstroom te hervatten.
checkpoint_id str | None Controlepunt waaruit u wilt herstellen. Moet checkpoint_storage worden ingesteld.
checkpoint_storage CheckpointStorage | None Hiermee overschrijft u de standaardopslagset op de decorator voor deze uitvoering.
include_status_events bool Neem statuswijzigingsgebeurtenissen op in het resultaat zonder streaming.

Precies één van message, responsesof checkpoint_id moet per gesprek worden opgegeven.

WorkflowRunResult

run() (niet-streaming) retourneert een WorkflowRunResult. Belangrijkste methoden:

Methode/eigenschap Returns Description
.text str Eerste uitvoer als een tekenreeks. Lege tekenreeks indien er geen tekenreeksuitvoer aanwezig is.
.get_outputs() list[Any] Alle uitvoer die door de werkstroom wordt gegenereerd.
.get_final_state() WorkflowRunState Laatste uitvoeringsstatus (IDLE, IDLE_WITH_PENDING_REQUESTS, FAILED...).
.get_request_info_events() list[WorkflowEvent] In behandeling zijnde HITL-aanvragen wanneer de status is IDLE_WITH_PENDING_REQUESTS.

Streaming

Doorgeven stream=True om gebeurtenissen te ontvangen wanneer ze worden geproduceerd:

from agent_framework import workflow

@workflow
async def data_pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return await transform_data(raw)

# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
    if event.type == "output":
        print(f"Output: {event.data}")

# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")

Zie python/samples/03-workflows/functional/basic_streaming_pipeline.py voor een volledig voorbeeld.

@step decorateur

@step is een opt-in decorator die resultatencache, gebeurtenisuitgifte en controlepunten per stap toevoegt aan afzonderlijke asynchrone functies:

from agent_framework import step, workflow

@step
async def fetch_data(url: str) -> dict:
    # expensive — hits a real API
    return await http_get(url)

@workflow
async def pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return process(raw)

Wat @step doet binnen een werkstroom?

  • Cacheresultaten : het resultaat wordt opgeslagen door (step_name, call_index). Bij HITL hervatten of controlepuntherstel retourneert een voltooide stap direct het opgeslagen resultaat, in plaats van opnieuw te worden uitgevoerd.
  • Verzendt gebeurtenissen: executor_invoked / executor_completed / executor_failed worden verzonden voor waarneembaarheid. Bij een cachetreffer wordt executor_bypassed in plaats daarvan verzonden.
  • Hiermee worden controlepunten opgeslagen: als de werkstroom een controlepunt heeft checkpoint_storage, wordt er een controlepunt opgeslagen nadat elke stap is voltooid.
  • Injecteert WorkflowRunContext — als de stapfunctie een ctx: WorkflowRunContext parameter declareert, wordt de actieve context automatisch geïnjecteerd.

Buiten een actieve werkstroom is @step transparant — de functie gedraagt zich identiek aan de ongedecoreerde versie, waardoor deze volledig testbaar is in isolatie.

Wanneer gebruikt u @step

Gebruik @step op functies die duur zijn om opnieuw uit te voeren: agentaanroepen, externe API-aanvragen of een bewerking waarbij opnieuw uitvoeren op hervatten kostbaar is of bijwerkingen heeft. Gewone functies (zonder @step) werken nog steeds binnen @workflow; ze worden gewoon opnieuw uitgevoerd wanneer de werkstroom wordt hervat.

from agent_framework import InMemoryCheckpointStorage, step, workflow

storage = InMemoryCheckpointStorage()

@step  # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
    return (await agent.run(prompt)).text

# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
    return len(text) > 0

@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
    draft = await call_llm(f"Write about: {topic}")
    ok = await validate(draft)
    return draft if ok else ""

@step accepteert ook een name parameter:

@step(name="transform")
async def transform_data(raw: dict) -> str:
    ...

Zie python/samples/03-workflows/functional/steps_and_checkpointing.py voor een volledig voorbeeld.

WorkflowRunContext

WorkflowRunContext (korte alias: RunContext) is de uitvoeringscontext die wordt geïnjecteerd in workflows en stapfuncties. U hebt deze alleen nodig wanneer u HITL, sleutel/waardestatus of aangepaste gebeurtenissen gebruikt.

Importeer het uit agent_framework:

from agent_framework import WorkflowRunContext, workflow

ctx.request_info() — Mens-in-de-lus

ctx.request_info() onderbreekt de werkstroom om te wachten op externe invoer:

@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
    draft = await write_draft(topic)
    feedback = await ctx.request_info(
        {"draft": draft, "instructions": "Please review this draft"},
        response_type=str,
        request_id="review_request",
    )
    return await revise_draft(draft, feedback)

Parameters:

Parameter Type Description
request_data Any Payload die beschrijft welke invoer nodig is (dict, Pydantic-model, tekenreeks, ...).
response_type type Verwacht Python type antwoord.
request_id str | None Stabiele identifier voor deze aanvraag. Er wordt een willekeurige UUID gegenereerd als u dit weglaat.

Semantiek opnieuw afspelen: Bij de eerste uitvoering request_info() wordt een intern signaal (nooit zichtbaar voor uw code) weergegeven dat de werkstroom onderbreekt. De beller ontvangt een WorkflowRunResult met get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS. Hervatten door aan te roepen .run(responses={request_id: value}) : de werkstroom wordt opnieuw uitgevoerd vanaf de bovenkant en request_info() retourneert de opgegeven waarde onmiddellijk.

@step-versierde functies die vóór de onderbreking werden uitgevoerd, retourneren hun in de cache opgeslagen resultaten bij hervatting in plaats van opnieuw uit te voeren.

Het antwoord verwerken:

# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
print(requests[0].request_id)  # "review_request"

# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
    responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)

Zie python/samples/03-workflows/functional/hitl_review.py voor een volledig voorbeeld.

ctx.request_info() wordt ook ondersteund in @step functies.

ctx.add_event() — Aangepaste gebeurtenissen

Gebruik ctx.add_event() dit om toepassingsspecifieke gebeurtenissen te verzenden naast levenscyclusgebeurtenissen van frameworks. Zie Aangepaste gebeurtenissen verzenden voor meer informatie en voorbeelden.

ctx.get_state() / ctx.set_state() — Sleutel/waarde toestand

Gebruik ctx.get_state() en ctx.set_state() sla waarden op die behouden blijven tijdens HITL-onderbrekingen en worden opgenomen in controlepunten. Zie de werkstroomstatus voor meer informatie.

Statuswaarden moeten JSON-serializeerbaar zijn wanneer controlepuntopslag is geconfigureerd.

ctx.is_streaming()

Retourneert True wanneer de huidige uitvoering is gestart met stream=True. Handige binnenstapfuncties die hun gedrag willen aanpassen op basis van de streamingmodus.

get_run_context()

Hiermee wordt de actieve WorkflowRunContext waarde opgehaald vanaf elke locatie in een actieve werkstroom, handig in helperfuncties die geen parameter declareren ctx :

from agent_framework import get_run_context

async def helper():
    ctx = get_run_context()
    if ctx is not None:
        ctx.set_state("helper_ran", True)

Retourneert None wanneer deze wordt aangeroepen buiten een actieve werkstroom.

Parallellisme met asyncio.gather

Gebruik standaard Python-concurrentie voor fan-out/fan-in — frameworkprimitieven zijn niet nodig.

import asyncio
from agent_framework import workflow

@workflow
async def research_pipeline(topic: str) -> str:
    web, papers, news = await asyncio.gather(
        research_web(topic),
        research_papers(topic),
        research_news(topic),
    )
    return await synthesize([web, papers, news])

asyncio.gather werkt ook wanneer de functies zijn ingericht met @step.

Zie python/samples/03-workflows/functional/parallel_pipeline.py voor een volledig voorbeeld.

Agents aanroepen binnen werkstromen

Agent-aanroepen werken als gewone functie-aanroepen binnen @workflow:

from agent_framework import Agent, workflow

writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)

@workflow
async def poem_workflow(topic: str) -> str:
    poem = (await writer.run(f"Write a poem about: {topic}")).text
    review = (await reviewer.run(f"Review this poem: {poem}")).text
    return f"Poem:\n{poem}\n\nReview: {review}"

Voeg @step toe aan functies voor het aanroepen van agents wanneer u wilt dat de resultaten worden opgeslagen in de cache voor het hervatten van HITL-processen of het herstellen van controlepunten.

from agent_framework import step

@step
async def write_poem(topic: str) -> str:
    return (await writer.run(f"Write a poem about: {topic}")).text

Zie python/samples/03-workflows/functional/agent_integration.py voor een volledig voorbeeld.

.as_agent() — Een werkstroom gebruiken als agent

Wikkel een FunctionalWorkflow in als een agent-compatibel object met .as_agent():

from agent_framework import workflow

@workflow
async def poem_workflow(topic: str) -> str:
    ...

# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")

# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)

# Or use in a larger workflow or orchestration

.as_agent() retourneert een FunctionalWorkflowAgent die dezelfde run() interface biedt als andere agentobjecten, waardoor functionele werkstromen samengesteld kunnen worden met elk systeem dat agenten accepteert.

Parameter Type Description
name str | None Weergavenaam voor de agent. De standaardinstelling is de naam van de werkstroom.

Zie python/samples/03-workflows/functional/agent_integration.py voor een voorbeeld.

Samples

Uitvoerbare voorbeelden bevinden zich in de volgende voorbeeldmappen:

Volgende stappen 

Verwante onderwerpen:

De functionele werkstroom-API is momenteel niet beschikbaar voor C#.