foreach_batch_sink

Dekoratören @dp.foreach_batch_sink() definierar en ForEachBatch-mottagare som bearbetar en ström som en serie mikrobatcher som du hanterar i Python med anpassad logik. Du refererar till mottagaren som ett target i ett tilläggsflöde för att skriva transformerade data. Konceptuell vägledning, överväganden och exempel finns i Använda ForEachBatch för att skriva till godtyckliga datamottagare i pipelines.

Syntax

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.

Parameters

Parameter Description
name Optional. Ett unikt namn för att identifiera slutpunkten i pipelinen. Använder UDF-namnet som standard om det inte anges.
batch_handler Det här är den användardefinierade funktion (UDF) som anropas för varje mikrobatch.
Df Spark DataFrame som innehåller data för den aktuella mikrobatchen.
batch_id Heltals-ID för mikrobatchen. Spark ökar detta ID för varje utlösarintervall.
En batch_id av 0 representerar början av en dataström eller början på en fullständig uppdatering. Koden foreach_batch_sink bör korrekt hantera en fullständig uppdatering för nedströms datakällor. Mer information finns i Fullständig uppdatering.