非同期の進行状況の追跡

非同期進行状況の追跡では、クエリでチェックポイントの進行状況を非同期的に更新し、各マイクロバッチ内のデータを処理できるようにすることで、構造化ストリーミング パイプラインの待機時間が短縮されます。

クエリ処理中、構造化ストリーミングはオフセットを保持および管理して、各マイクロバッチの offsetLogcommitLog のクエリの進行状況を測定します。 非同期の進行状況の追跡がないと、オフセット管理操作は処理の待機時間に直接影響します。データ処理は完了するまで続行できないためです。

非同期進行状況追跡

手記

非同期の進行状況の追跡は、 Trigger.once または Trigger.availableNow トリガーと互換性がありません。 有効にすると、 Trigger.once または Trigger.availableNow を含む構造化ストリーミング クエリは失敗します。

コンフィギュレーション オプション

オプション 既定値 説明
asyncProgressTrackingEnabled false 非同期の進行状況の追跡を有効にするかどうかを指定します。
asyncProgressTrackingCheckpointIntervalMs 1000 オフセットの書き込みと完了コミットの間の間隔 (ミリ秒単位)。

非同期の進行状況の追跡を有効にする

非同期の進行状況の追跡を有効にするには、 asyncProgressTrackingEnabledtrueに設定します。

Python

stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()
)

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()
)

Scala

val stream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()

チェックポイントの頻度でスループットを向上させる

既定のチェックポイントの頻度 1000 ミリ秒は、ほとんどのクエリに適したスループットを持っています。 オフセット管理操作が非同期の進行状況の追跡で処理できるよりも速い場合は、オフセット管理操作のバックログが作成されます。 バックログがさらに増加するのを防ぐために、非同期の進行状況の追跡によってデータ処理がブロックされたり遅くなったりする可能性があるため、予想される待機時間の利点が損なわれる可能性があります。

このシナリオでは、Databricks ではチェックポイント間隔を長くすることをお勧めします。

Python

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()
)

Scala

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()

手記

障害復旧時間は、チェックポイント間隔の時間と共に増加します。 障害が発生した場合、パイプラインは、前の正常なチェックポイント以降のすべてのデータを再処理する必要があります。 運用環境でこの変更を行う前に、障害が発生した場合の復旧時間と比較して、通常の処理中の待機時間が短い間のトレードオフを考慮してください。

非同期の進行状況の追跡をオフにする

非同期進行状況の追跡が有効になっている場合、ストリームではすべてのバッチのチェックポイントの進行状況が保証されるわけではありません。 この機能をオフにするには、事前に進捗をチェックポイントに保存する必要があります。

オフにするには、次の手順に従います。

  1. asyncProgressTrackingEnabledtrue に設定し、asyncProgressTrackingCheckpointIntervalMs0 に設定して、少なくとも 2 つのマイクロバッチを処理します。

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    
  2. クエリを停止します。

    Python

    query.stop()
    

    Scala

    query.stop()
    
  3. 非同期の進行状況の追跡をオフにして、クエリを再起動します。

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    

上記の手順に従わずに非同期の進行状況の追跡をオフにすると、次のエラーが発生する可能性があります。

java.lang.IllegalStateException: batch x doesn't exist

ドライバー ログに、次のエラーが表示される場合があります。

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

制限事項

  • Kafka シンクの場合、非同期の進行状況の追跡ではステートレス パイプラインのみがサポートされます。
  • 非同期の進行状況の追跡では、バッチのオフセット範囲が障害時に変更される可能性があるため、エンドツーエンドの処理が正確に 1 回保証されるわけではありません。 Kafka などの一部のシンクでは、厳密に 1 回の保証は提供されません。