Zustandsbehaftete Verarbeitung mit Grenzwerten optimieren

Um die im Zustand gespeicherten Daten effektiv zu verwalten, verwenden Sie Grenzwerte bei der Durchführung der zustandsbehafteten Datenstromverarbeitung in Lakeflow Spark Declarative Pipelines, einschließlich Aggregationen, Verknüpfungen und Deduplizierung. In den folgenden Abschnitten wird gezeigt, wie Wasserzeichen in Ihren Pipelineabfragen mit Beispielen für die empfohlenen Vorgänge angewendet werden.

Weitere Informationen zu Streamingtabellen finden Sie unter Streamingtabellen.

Hinweis

Um sicherzustellen, dass Abfragen, die Aggregationen ausführen, inkrementell verarbeitet und nicht vollständig mit jedem Update neu komputiert werden, müssen Sie Wasserzeichen verwenden.

Was ist ein Wasserzeichen?

Bei der Datenstromverarbeitung ist ein Wasserzeichen ein Feature von Apache Spark, das einen zeitbasierten Schwellenwert für die Verarbeitung von Daten definieren kann, wenn zustandsbehaftete Vorgänge, wie z.B. Aggregationen, ausgeführt werden. Die eingehenden Daten werden verarbeitet, bis der Schwellenwert erreicht ist, zu dem zeitpunkt, an dem das durch den Schwellenwert definierte Zeitfenster geschlossen wird. Wasserzeichen können verwendet werden, um Probleme während der Abfrageverarbeitung zu vermeiden, hauptsächlich bei der Verarbeitung größerer Datasets oder langer Verarbeitung. Diese Probleme können eine hohe Wartezeit bei der Erstellung von Ergebnissen und sogar Fehler durch nicht genügend Arbeitsspeicher (OOM) aufgrund der Datenmenge umfassen, die während der Verarbeitung im Zustand gespeichert sind. Da Streamingdaten inhärent ungeordnet sind, unterstützen Wasserzeichen auch die ordnungsgemäße Berechnung von Vorgängen wie Zeitfensteraggregationen.

Weitere Informationen zur Verwendung von Wasserzeichen in der Datenstromverarbeitung finden Sie unter Wasserzeichen in Apache Spark Structured Streaming und Anwenden von Wasserzeichen zur Steuerung von Datenverarbeitungsschwellenwerten.

Wie definieren Sie ein Wasserzeichen?

Sie definieren ein Wasserzeichen, indem Sie ein Zeitstempelfeld und einen Wert angeben, der den Zeitschwellenwert für verspätete Daten darstellt. Daten werden verspätet betrachtet, wenn sie nach dem definierten Zeitschwellenwert eintreffen. Wenn der Schwellenwert beispielsweise als 10 Minuten definiert ist, werden Datensätze, die nach dem Schwellenwert von 10 Minuten ankommen, möglicherweise verworfen.

Da Datensätze, die nach dem definierten Schwellenwert eingehen, möglicherweise verworfen werden, ist es wichtig, einen Schwellenwert auszuwählen, der Ihre Latenz im Vergleich zu den Korrektheitsanforderungen erfüllt. Die Auswahl eines kleineren Grenzwerts führt dazu, dass Datensätze früher ausgegeben werden, bedeutet aber auch, dass verzögerte Datensätze eher verworfen werden. Ein größerer Schwellenwert bedeutet eine längere Wartezeit, aber möglicherweise mehr Vollständigkeit der Daten. Aufgrund der größeren Zustandsgröße kann ein größerer Schwellenwert auch zusätzliche Computerressourcen erfordern. Da der Schwellenwert von Ihren Daten- und Verarbeitungsanforderungen abhängt, ist es wichtig, ihre Verarbeitung zu testen und zu überwachen, um einen optimalen Schwellenwert zu ermitteln.

Sie verwenden die withWatermark() Funktion in Python, um ein Wasserzeichen zu definieren. Verwenden Sie in SQL die WATERMARK Klausel, um ein Wasserzeichen zu definieren:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Verwenden von Grenzwerten mit Datenstromverknüpfungen

Bei Datenstromverknüpfungen müssen Sie ein Wasserzeichen auf beiden Seiten der Verknüpfung und eine Zeitintervallklausel definieren. Da jede Verknüpfungsquelle eine unvollständige Ansicht der Daten enthält, ist die Zeitintervallklausel erforderlich, um dem Streamingmodul mitzuteilen, wann keine weiteren Übereinstimmungen vorgenommen werden können. Die Zeitintervallklausel muss dieselben Felder verwenden, die zum Definieren der Wasserzeichen verwendet werden.

Da es vorkommen kann, dass für jeden Datenstrom unterschiedliche Schwellenwerte für Wasserzeichen erforderlich sind, müssen die Ströme nicht über die gleichen Schwellenwerte verfügen. Um fehlende Daten zu vermeiden, verwaltet das Streamingmodul ein globales Wasserzeichen basierend auf dem langsamsten Datenstrom.

Im folgenden Beispiel wird ein Stream von Anzeigenimpressionen und ein Stream von Benutzerklicks auf Anzeigen verbunden. In diesem Beispiel muss innerhalb von 3 Minuten nach dem Eindruck ein Klick erfolgen. Nach Ablauf des dreiminütigen Zeitintervalls werden Zeilen aus dem Zustand gelöscht, die nicht mehr abgeglichen werden können.

Python

from pyspark import pipelines as dp

dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Durchführen von Aggregationen im Fenstermodus mit Grenzwerten

Ein häufiger zustandsbehafteter Vorgang bei Streamingdaten ist eine fensterbasierte Aggregation. Fensteraggregationen ähneln gruppierten Aggregationen, mit der Ausnahme, dass Aggregatwerte für den Satz von Zeilen zurückgegeben werden, die Teil des definierten Fensters sind.

Ein Fenster kann als bestimmte Länge definiert werden, und ein Aggregationsvorgang kann für alle Zeilen ausgeführt werden, die Teil dieses Fensters sind. Spark Streaming unterstützt drei Arten von Fenstern:

  • Rollierende (feste) Fenster: Eine Reihe nicht überlappender und zusammenhängender Zeitintervalle mit einer festen Größe. Ein Eingabedatensatz gehört nur zu einem einzelnen Fenster.
  • Gleitfenster: Ähnlich wie bei stürzenden Fenstern sind gleitende Fenster fester Größe, fenster können sich jedoch überlappen, und ein Datensatz kann in mehrere Fenster fallen.

Wenn Daten über das Ende des Fensters und die Länge des Wasserzeichens hinaus gelangen, werden keine neuen Daten für das Fenster akzeptiert, das Ergebnis der Aggregation wird ausgegeben, und der Zustand für das Fenster wird gelöscht.

Im folgenden Beispiel wird alle 5 Minuten mithilfe eines festen Fensters eine Summe der Impressionen berechnet. In diesem Beispiel verwendet die Auswahlklausel den Alias impressions_window, und dann wird das Fenster selbst als Teil der GROUP BY Klausel definiert. Das Fenster muss auf derselben Zeitstempelspalte basieren wie das Wasserzeichen, die clickTimestamp Spalte in diesem Beispiel.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Ein ähnliches Python-Beispiel, in dem der Gewinn stündlich über feste Fenster berechnet wird:

from pyspark import pipelines as dp

@dp.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Deduplizieren von Streamingdatensätzen

Strukturiertes Streaming bietet genau einmal Verarbeitungsgarantien, führt aber nicht automatisch zu Duplikaten von Datensätzen aus Datenquellen. Beispiel: Da viele Nachrichtenwarteschlangen At-Least-Once-Garantien aufweisen, ist beim Lesen aus einer dieser Nachrichtenwarteschlangen mit doppelten Datensätzen zu rechnen. Sie können die dropDuplicatesWithinWatermark() Funktion verwenden, um Datensätze für jedes angegebene Feld zu deduplizieren und Duplikate aus einem Datenstrom zu entfernen, auch wenn sich einige Felder unterscheiden (z. B. Ereigniszeit oder Ankunftszeit). Sie müssen ein Wasserzeichen angeben, das die dropDuplicatesWithinWatermark() Funktion verwenden soll. Alle doppelten Daten, die innerhalb des durch das Wasserzeichen angegebenen Zeitraums eingehen, werden gelöscht.

Sortierte Daten sind wichtig, da unsortierte Daten dazu führen, dass der Grenzwert nicht richtig fortgeführt wird. Wenn dann ältere Daten eintreffen, wird sie als verspätet betrachtet und verworfen. Verwenden Sie die withEventTimeOrder Option, um die anfängliche Momentaufnahme basierend auf dem im Wasserzeichen angegebenen Zeitstempel in der Reihenfolge zu verarbeiten. Die withEventTimeOrder Option kann im Code deklariert werden, der das Dataset definiert, oder in den Pipelineeinstellungen mithilfe spark.databricks.delta.withEventTimeOrder.enabledvon . Beispiel:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Hinweis

Die withEventTimeOrder Option wird nur mit Python unterstützt.

Im folgenden Beispiel werden die Daten geordnet nach clickTimestamp verarbeitet, und Datensätze, die innerhalb von 5 Sekunden voneinander ankommen und die doppelten userId- und clickAdId-Spalten enthalten, werden verworfen.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optimieren der Pipelinekonfiguration für zustandsbehaftete Verarbeitung

Um Produktionsprobleme und übermäßige Latenz zu verhindern, empfiehlt Databricks die Aktivierung der RocksDB-basierten Zustandsverwaltung für die zustandsbehaftete Datenstromverarbeitung, insbesondere, wenn ihre Verarbeitung eine große Menge an Zwischenzustand spart.

Serverlose Pipelines verwalten automatisch Zustandsspeicherkonfigurationen.

Sie können die RocksDB-basierte Zustandsverwaltung aktivieren, indem Sie die folgende Konfiguration festlegen, bevor Sie eine Pipeline bereitstellen:

{
  "configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Weitere Informationen zum RocksDB-Zustandsspeicher, einschließlich Konfigurationsempfehlungen für RocksDB, finden Sie unter Configure RocksDB state store on Azure Databricks.

Informationen zu zustandsbehafteten Vorgängen, die Millisekundenlatenz erfordern, finden Sie unter Verwenden des Echtzeitmodus in Lakeflow Spark Declarative Pipelines.