関数型ワークフロー API

Warning

機能ワークフロー API は 試験的 であり、将来のバージョンでは予告なしに変更または削除される可能性があります。

関数型ワークフロー API を使用すると、単純なPython非同期関数としてワークフローを記述できます。 Executor クラスを定義する代わりに、 エッジを配線し、WorkflowBuilder を使用して、async 関数を @workflow で装飾し、ネイティブ Python制御フロー (if/elsefor ループ、asyncio.gather) を使用してロジックを表現します。

グラフ API とサイド バイ サイドの比較については、「ワークフローの概要 」の「ワークフロー API」を 参照してください。

@workflow デコーダー

@workflowasync関数に適用して、FunctionalWorkflow オブジェクトに変換します。

from agent_framework import workflow

@workflow
async def text_pipeline(text: str) -> str:
    upper = await to_upper_case(text)
    return await reverse_text(upper)

@workflow デコレーターは、省略可能な引数を持つパラメーター化された形式をサポートしています。

from agent_framework import InMemoryCheckpointStorage, workflow

storage = InMemoryCheckpointStorage()

@workflow(name="my_pipeline", description="Uppercase then reverse", checkpoint_storage=storage)
async def text_pipeline(text: str) -> str:
    ...

@workflow パラメーター

パラメーター タイプ 説明
name str | None ワークフローの表示名。 既定値は関数の __name__です。
description str | None 人間が判読できる省略可能な説明。
checkpoint_storage CheckpointStorage | None 実行の間にステップの結果を保持するための既定のストレージ。 run()の呼び出しごとにオーバーライドできます。

ワークフロー関数シグネチャ

ワークフロー関数の最初の パラメーター は、 .run()に渡された入力を受け取ります。 HITL、キー/値の状態、またはカスタム イベントが必要な場合にのみ、 ctx: WorkflowRunContext パラメーターを追加します。それ以外の場合は省略可能です。

# No ctx needed — just a plain pipeline
@workflow
async def simple_pipeline(data: str) -> str:
    result = await process(data)
    return result

# ctx needed for HITL, state, or custom events
@workflow
async def hitl_pipeline(data: str, ctx: WorkflowRunContext) -> str:
    feedback = await ctx.request_info({"draft": data}, response_type=str)
    return feedback

WorkflowRunContext は最初に型注釈によって検出され、次にパラメーター名 ctxによって検出されるため、 ctx: WorkflowRunContext パラメーターとベア ctx パラメーターの両方が機能します。

ワークフローの実行

.run()FunctionalWorkflowによって返される@workflowオブジェクトで呼び出します。

# Calling the decorated function directly returns the raw return value
raw = await text_pipeline("hello world")   # str — the raw return value

# .run() wraps the result in a WorkflowRunResult with events and state
result = await text_pipeline.run("hello world")
print(result.text)                # first output as a string
print(result.get_outputs())       # list of all outputs
print(result.get_final_state())   # WorkflowRunState.IDLE

run() パラメーター

パラメーター タイプ 説明
message Any | None ワークフロー関数に最初の引数として渡される入力。
stream bool True場合は、ResponseStreamオブジェクトを生成するWorkflowEventを返します。 既定値は False です。
responses dict[str, Any] | None request_idによってキーが設定された HITL 応答。 中断されたワークフローを再開するために使用されます。
checkpoint_id str | None 復元するチェックポイント。 checkpoint_storage を設定する必要があります。
checkpoint_storage CheckpointStorage | None この実行のデコレーターの既定のストレージ セットをオーバーライドします。
include_status_events bool ストリーミング以外の結果に状態変更イベントを含めます。

呼び出しごとに、 messageresponses、または checkpoint_id のいずれかを指定する必要があります。

WorkflowRunResult

run() (非ストリーミング) は WorkflowRunResultを返します。 主なメソッド:

メソッド/プロパティ 返品 説明
.text str 最初の出力は文字列です。 文字列が出力されない場合は空の文字列。
.get_outputs() list[Any] ワークフローによって出力されるすべての出力。
.get_final_state() WorkflowRunState 最終的な実行状態 (IDLEIDLE_WITH_PENDING_REQUESTSFAILED、...)。
.get_request_info_events() list[WorkflowEvent] 状態が IDLE_WITH_PENDING_REQUESTS の場合の保留中のHITLリクエスト。

ストリーミング

生成されたイベントを受け取るために stream=True を渡します。

from agent_framework import workflow

@workflow
async def data_pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return await transform_data(raw)

# stream=True returns a ResponseStream you iterate with async for
stream = data_pipeline.run("https://example.com/api/data", stream=True)
async for event in stream:
    if event.type == "output":
        print(f"Output: {event.data}")

# After iteration, get_final_response() returns the WorkflowRunResult
result = await stream.get_final_response()
print(f"Final state: {result.get_final_state()}")

完全な例については、 python/samples/03-workflows/functional/basic_streaming_pipeline.py を参照してください。

@step デコーダー

@step は、個々の非同期関数に結果キャッシュ、イベントの生成、およびステップごとのチェックポイント処理を追加するオプトイン デコレーターです。

from agent_framework import step, workflow

@step
async def fetch_data(url: str) -> dict:
    # expensive — hits a real API
    return await http_get(url)

@workflow
async def pipeline(url: str) -> str:
    raw = await fetch_data(url)
    return process(raw)

ワークフロー内での @step の動作

  • 結果をキャッシュします 。結果は (step_name, call_index)によって格納されます。 HITL の再開またはチェックポイントの復元では、完了したステップは、再実行するのではなく、保存された結果を即座に返します。
  • イベントを出力しますexecutor_invoked / executor_completed / executor_failed は、監視のために生成されます。 キャッシュ ヒット時に、代わりに executor_bypassed が生成されます。
  • チェックポイントを保存します 。ワークフローに checkpoint_storageがある場合は、各ステップの完了後にチェックポイントが保存されます。
  • WorkflowRunContextを挿入しますステップ関数がctx: WorkflowRunContextパラメーターを宣言すると、アクティブなコンテキストが自動的に挿入されます。

実行中のワークフローの外部では、 @step は透過的であり、関数は非コーディングバージョンと同じように動作し、分離して完全にテストできます。

どのようなときに @step を使用するか

@stepがかかる関数 (エージェント呼び出し、外部 API 要求、再開時の再実行にコストがかかる、または副作用がある操作) に対してを使用します。 プレーン関数 ( @stepなし) は、 @workflow内で引き続き機能します。ワークフローが再開されると、単に再実行されます。

from agent_framework import InMemoryCheckpointStorage, step, workflow

storage = InMemoryCheckpointStorage()

@step  # cached — won't re-run on resume
async def call_llm(prompt: str) -> str:
    return (await agent.run(prompt)).text

# No @step — cheap, fine to re-run
async def validate(text: str) -> bool:
    return len(text) > 0

@workflow(checkpoint_storage=storage)
async def pipeline(topic: str) -> str:
    draft = await call_llm(f"Write about: {topic}")
    ok = await validate(draft)
    return draft if ok else ""

@step では、 name パラメーターも受け取ります。

@step(name="transform")
async def transform_data(raw: dict) -> str:
    ...

完全な例については、 python/samples/03-workflows/functional/steps_and_checkpointing.py を参照してください。

WorkflowRunContext

WorkflowRunContext (短いエイリアス: RunContext)は、ワークフローおよびステップ関数に挿入される実行コンテキストです。 HITL、キー/値の状態、またはカスタム イベントを使用する場合にのみ必要です。

agent_frameworkからインポートします。

from agent_framework import WorkflowRunContext, workflow

ctx.request_info() — 人間を介在させたプロセス

ctx.request_info() は、ワークフローを中断して外部入力を待機します。

@workflow
async def review_pipeline(topic: str, ctx: WorkflowRunContext) -> str:
    draft = await write_draft(topic)
    feedback = await ctx.request_info(
        {"draft": draft, "instructions": "Please review this draft"},
        response_type=str,
        request_id="review_request",
    )
    return await revise_draft(draft, feedback)

パラメーター:

パラメーター タイプ 説明
request_data Any 必要な入力を記述するペイロード (dict、Pydantic モデル、文字列、...)。
response_type type 期待される応答のPythonの型。
request_id str | None このリクエストの永続的な識別子。 省略すると、ランダムな UUID が生成されます。

再生セマンティクス: 最初の実行時に、 request_info() はワークフローを中断する内部シグナルを発生させます (コードには表示されません)。 呼び出し元は、WorkflowRunResultを含むget_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTSを受け取ります。 .run(responses={request_id: value})を呼び出して再開します。ワークフローは一番上から再実行され、request_info()はすぐに指定された値が返されます。

@step中断の前に実行された -decorated 関数は、再実行するのではなく、再開時にキャッシュされた結果を返します。

応答の処理:

# Phase 1 — run until the workflow pauses
result1 = await review_pipeline.run("AI Safety")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
print(requests[0].request_id)  # "review_request"

# Phase 2 — resume with the human's answer
result2 = await review_pipeline.run(
    responses={"review_request": "Add more details about alignment research"}
)
print(result2.text)

完全な例については、 python/samples/03-workflows/functional/hitl_review.py を参照してください。

ctx.request_info() は、 @step 関数内でもサポートされています。

ctx.add_event() — カスタム イベント

ctx.add_event()を使用して、フレームワーク ライフサイクル イベントと共にアプリケーション固有のイベントを出力します。 詳細と例については、「 カスタム イベントの出力」を参照してください。

ctx.get_state() / ctx.set_state() — キー/値の状態

ctx.get_state()ctx.set_state()を使用して、HITL 中断間で保持され、チェックポイントに含まれる値を格納します。 詳細については、「ワークフローの 状態」を参照してください。

チェックポイント ストレージが構成されている場合、状態値は JSON シリアル化可能である必要があります。

ctx.is_streaming()

現在の実行が True で開始されたときのstream=Trueを返します。 ストリーミング モードに基づいて動作を調整するステップ関数内で便利です。

get_run_context()

実行中のワークフロー内の任意の場所からアクティブな WorkflowRunContext を取得します。 ctx パラメーターを宣言しないヘルパー関数で役立ちます。

from agent_framework import get_run_context

async def helper():
    ctx = get_run_context()
    if ctx is not None:
        ctx.set_state("helper_ran", True)

実行中のワークフローの外部で呼び出されたときに None を返します。

並列処理 asyncio.gather

ファンアウト/ファンインには標準のPythonコンカレンシーを使用します。フレームワーク プリミティブは必要ありません。

import asyncio
from agent_framework import workflow

@workflow
async def research_pipeline(topic: str) -> str:
    web, papers, news = await asyncio.gather(
        research_web(topic),
        research_papers(topic),
        research_news(topic),
    )
    return await synthesize([web, papers, news])

asyncio.gather 関数が @stepで装飾されている場合にも機能します。

完全な例については、 python/samples/03-workflows/functional/parallel_pipeline.py を参照してください。

ワークフロー内のエージェントの呼び出し

エージェント呼び出しは、 @workflow内でプレーン関数呼び出しとして機能します。

from agent_framework import Agent, workflow

writer = Agent(name="WriterAgent", instructions="Write a short poem.", client=client)
reviewer = Agent(name="ReviewerAgent", instructions="Review the poem.", client=client)

@workflow
async def poem_workflow(topic: str) -> str:
    poem = (await writer.run(f"Write a poem about: {topic}")).text
    review = (await reviewer.run(f"Review this poem: {poem}")).text
    return f"Poem:\n{poem}\n\nReview: {review}"

HITL の再開またはチェックポイント復元で結果をキャッシュする場合は、エージェント呼び出し関数に @step を追加します。

from agent_framework import step

@step
async def write_poem(topic: str) -> str:
    return (await writer.run(f"Write a poem about: {topic}")).text

完全な例については、 python/samples/03-workflows/functional/agent_integration.py を参照してください。

.as_agent() — エージェントとしてのワークフローの使用

FunctionalWorkflowを使用して、.as_agent()をエージェント互換オブジェクトとしてラップします。

from agent_framework import workflow

@workflow
async def poem_workflow(topic: str) -> str:
    ...

# Wrap as an agent
agent = poem_workflow.as_agent(name="PoemAgent")

# Use with the standard agent interface
response = await agent.run("Write a poem about the ocean")
print(response.text)

# Or use in a larger workflow or orchestration

.as_agent()は、他のエージェント オブジェクトと同じFunctionalWorkflowAgent インターフェイスを公開するrun()を返し、エージェントを受け入れる任意のシステムで機能ワークフローを構成できるようにします。

パラメーター タイプ 説明
name str | None エージェントの表示名。 既定値はワークフロー名です。

例については、 python/samples/03-workflows/functional/agent_integration.py を参照してください。

Samples

実行可能な例は、次のサンプル フォルダーにあります。

次のステップ

関連トピック:

現時点では、関数型ワークフロー API は C# では使用できません。